From c9a9e8eb6a13ac9d1bb6ed44fbdbfc185de1fef0 Mon Sep 17 00:00:00 2001 From: Jeff Ashton Date: Tue, 10 Dec 2019 15:37:23 -0500 Subject: [PATCH 1/3] Adding support for error_output_prefix to s3 destination --- ...ce_aws_kinesis_firehose_delivery_stream.go | 152 ++++++++++-------- ...s_kinesis_firehose_delivery_stream_test.go | 80 +++++++++ ...sis_firehose_delivery_stream.html.markdown | 2 +- 3 files changed, 169 insertions(+), 65 deletions(-) diff --git a/aws/resource_aws_kinesis_firehose_delivery_stream.go b/aws/resource_aws_kinesis_firehose_delivery_stream.go index e1c72d21823..acee056b06d 100644 --- a/aws/resource_aws_kinesis_firehose_delivery_stream.go +++ b/aws/resource_aws_kinesis_firehose_delivery_stream.go @@ -79,6 +79,11 @@ func s3ConfigurationSchema() *schema.Schema { Default: "UNCOMPRESSED", }, + "error_output_prefix": { + Type: schema.TypeString, + Optional: true, + }, + "kms_key_arn": { Type: schema.TypeString, Optional: true, @@ -313,6 +318,7 @@ func flattenFirehoseS3Configuration(description *firehose.S3DestinationDescripti "bucket_arn": aws.StringValue(description.BucketARN), "cloudwatch_logging_options": flattenCloudwatchLoggingOptions(description.CloudWatchLoggingOptions), "compression_format": aws.StringValue(description.CompressionFormat), + "error_output_prefix": aws.StringValue(description.ErrorOutputPrefix), "prefix": aws.StringValue(description.Prefix), "role_arn": aws.StringValue(description.RoleARN), } @@ -1378,7 +1384,7 @@ func createSourceConfig(source map[string]interface{}) *firehose.KinesisStreamSo return configuration } -func createS3Config(d *schema.ResourceData) *firehose.S3DestinationConfiguration { +func createS3Config(d *schema.ResourceData, withErrorOutputPrefix bool) *firehose.S3DestinationConfiguration { s3 := d.Get("s3_configuration").([]interface{})[0].(map[string]interface{}) configuration := &firehose.S3DestinationConfiguration{ @@ -1393,6 +1399,10 @@ func createS3Config(d *schema.ResourceData) *firehose.S3DestinationConfiguration EncryptionConfiguration: extractEncryptionConfiguration(s3), } + if withErrorOutputPrefix { + configuration.ErrorOutputPrefix = extractErrorOutputPrefixConfiguration(s3) + } + if _, ok := s3["cloudwatch_logging_options"]; ok { configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(s3) } @@ -1437,6 +1447,7 @@ func createExtendedS3Config(d *schema.ResourceData) *firehose.ExtendedS3Destinat IntervalInSeconds: aws.Int64(int64(s3["buffer_interval"].(int))), SizeInMBs: aws.Int64(int64(s3["buffer_size"].(int))), }, + ErrorOutputPrefix: extractErrorOutputPrefixConfiguration(s3), Prefix: extractPrefixConfiguration(s3), CompressionFormat: aws.String(s3["compression_format"].(string)), DataFormatConversionConfiguration: expandFirehoseDataFormatConversionConfiguration(s3["data_format_conversion_configuration"].([]interface{})), @@ -1451,10 +1462,6 @@ func createExtendedS3Config(d *schema.ResourceData) *firehose.ExtendedS3Destinat configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(s3) } - if v, ok := s3["error_output_prefix"]; ok && v.(string) != "" { - configuration.ErrorOutputPrefix = aws.String(v.(string)) - } - if s3BackupMode, ok := s3["s3_backup_mode"]; ok { configuration.S3BackupMode = aws.String(s3BackupMode.(string)) configuration.S3BackupConfiguration = expandS3BackupConfig(d.Get("extended_s3_configuration").([]interface{})[0].(map[string]interface{})) @@ -1463,7 +1470,7 @@ func createExtendedS3Config(d *schema.ResourceData) *firehose.ExtendedS3Destinat return configuration } -func updateS3Config(d *schema.ResourceData) *firehose.S3DestinationUpdate { +func updateS3Config(d *schema.ResourceData, withErrorOutputPrefix bool) *firehose.S3DestinationUpdate { s3 := d.Get("s3_configuration").([]interface{})[0].(map[string]interface{}) configuration := &firehose.S3DestinationUpdate{ @@ -1479,6 +1486,10 @@ func updateS3Config(d *schema.ResourceData) *firehose.S3DestinationUpdate { CloudWatchLoggingOptions: extractCloudWatchLoggingConfiguration(s3), } + if withErrorOutputPrefix { + configuration.ErrorOutputPrefix = extractErrorOutputPrefixConfiguration(s3) + } + if _, ok := s3["cloudwatch_logging_options"]; ok { configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(s3) } @@ -1524,6 +1535,7 @@ func updateExtendedS3Config(d *schema.ResourceData) *firehose.ExtendedS3Destinat IntervalInSeconds: aws.Int64((int64)(s3["buffer_interval"].(int))), SizeInMBs: aws.Int64((int64)(s3["buffer_size"].(int))), }, + ErrorOutputPrefix: extractErrorOutputPrefixConfiguration(s3), Prefix: extractPrefixConfiguration(s3), CompressionFormat: aws.String(s3["compression_format"].(string)), EncryptionConfiguration: extractEncryptionConfiguration(s3), @@ -1536,10 +1548,6 @@ func updateExtendedS3Config(d *schema.ResourceData) *firehose.ExtendedS3Destinat configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(s3) } - if v, ok := s3["error_output_prefix"]; ok && v.(string) != "" { - configuration.ErrorOutputPrefix = aws.String(v.(string)) - } - if s3BackupMode, ok := s3["s3_backup_mode"]; ok { configuration.S3BackupMode = aws.String(s3BackupMode.(string)) configuration.S3BackupUpdate = updateS3BackupConfig(d.Get("extended_s3_configuration").([]interface{})[0].(map[string]interface{})) @@ -1818,6 +1826,14 @@ func extractCloudWatchLoggingConfiguration(s3 map[string]interface{}) *firehose. } +func extractErrorOutputPrefixConfiguration(s3 map[string]interface{}) *string { + if v, ok := s3["error_output_prefix"]; ok { + return aws.String(v.(string)) + } + + return nil +} + func extractPrefixConfiguration(s3 map[string]interface{}) *string { if v, ok := s3["prefix"]; ok { return aws.String(v.(string)) @@ -1826,7 +1842,7 @@ func extractPrefixConfiguration(s3 map[string]interface{}) *string { return nil } -func createRedshiftConfig(d *schema.ResourceData, s3Config *firehose.S3DestinationConfiguration) (*firehose.RedshiftDestinationConfiguration, error) { +func createRedshiftConfig(d *schema.ResourceData) (*firehose.RedshiftDestinationConfiguration, error) { redshiftRaw, ok := d.GetOk("redshift_configuration") if !ok { return nil, fmt.Errorf("Error loading Redshift Configuration for Kinesis Firehose: redshift_configuration not found") @@ -1835,6 +1851,8 @@ func createRedshiftConfig(d *schema.ResourceData, s3Config *firehose.S3Destinati redshift := rl[0].(map[string]interface{}) + s3Config := createS3Config(d, false) + configuration := &firehose.RedshiftDestinationConfiguration{ ClusterJDBCURL: aws.String(redshift["cluster_jdbcurl"].(string)), RetryOptions: extractRedshiftRetryOptions(redshift), @@ -1859,7 +1877,7 @@ func createRedshiftConfig(d *schema.ResourceData, s3Config *firehose.S3Destinati return configuration, nil } -func updateRedshiftConfig(d *schema.ResourceData, s3Update *firehose.S3DestinationUpdate) (*firehose.RedshiftDestinationUpdate, error) { +func updateRedshiftConfig(d *schema.ResourceData) (*firehose.RedshiftDestinationUpdate, error) { redshiftRaw, ok := d.GetOk("redshift_configuration") if !ok { return nil, fmt.Errorf("Error loading Redshift Configuration for Kinesis Firehose: redshift_configuration not found") @@ -1868,6 +1886,8 @@ func updateRedshiftConfig(d *schema.ResourceData, s3Update *firehose.S3Destinati redshift := rl[0].(map[string]interface{}) + s3Update := updateS3Config(d, false) + configuration := &firehose.RedshiftDestinationUpdate{ ClusterJDBCURL: aws.String(redshift["cluster_jdbcurl"].(string)), RetryOptions: extractRedshiftRetryOptions(redshift), @@ -1892,7 +1912,7 @@ func updateRedshiftConfig(d *schema.ResourceData, s3Update *firehose.S3Destinati return configuration, nil } -func createElasticsearchConfig(d *schema.ResourceData, s3Config *firehose.S3DestinationConfiguration) (*firehose.ElasticsearchDestinationConfiguration, error) { +func createElasticsearchConfig(d *schema.ResourceData) (*firehose.ElasticsearchDestinationConfiguration, error) { esConfig, ok := d.GetOk("elasticsearch_configuration") if !ok { return nil, fmt.Errorf("Error loading Elasticsearch Configuration for Kinesis Firehose: elasticsearch_configuration not found") @@ -1901,6 +1921,8 @@ func createElasticsearchConfig(d *schema.ResourceData, s3Config *firehose.S3Dest es := esList[0].(map[string]interface{}) + s3Config := createS3Config(d, true) + config := &firehose.ElasticsearchDestinationConfiguration{ BufferingHints: extractBufferingHints(es), DomainARN: aws.String(es["domain_arn"].(string)), @@ -1929,7 +1951,7 @@ func createElasticsearchConfig(d *schema.ResourceData, s3Config *firehose.S3Dest return config, nil } -func updateElasticsearchConfig(d *schema.ResourceData, s3Update *firehose.S3DestinationUpdate) (*firehose.ElasticsearchDestinationUpdate, error) { +func updateElasticsearchConfig(d *schema.ResourceData) (*firehose.ElasticsearchDestinationUpdate, error) { esConfig, ok := d.GetOk("elasticsearch_configuration") if !ok { return nil, fmt.Errorf("Error loading Elasticsearch Configuration for Kinesis Firehose: elasticsearch_configuration not found") @@ -1938,6 +1960,8 @@ func updateElasticsearchConfig(d *schema.ResourceData, s3Update *firehose.S3Dest es := esList[0].(map[string]interface{}) + s3Update := updateS3Config(d, true) + update := &firehose.ElasticsearchDestinationUpdate{ BufferingHints: extractBufferingHints(es), DomainARN: aws.String(es["domain_arn"].(string)), @@ -1963,7 +1987,7 @@ func updateElasticsearchConfig(d *schema.ResourceData, s3Update *firehose.S3Dest return update, nil } -func createSplunkConfig(d *schema.ResourceData, s3Config *firehose.S3DestinationConfiguration) (*firehose.SplunkDestinationConfiguration, error) { +func createSplunkConfig(d *schema.ResourceData) (*firehose.SplunkDestinationConfiguration, error) { splunkRaw, ok := d.GetOk("splunk_configuration") if !ok { return nil, fmt.Errorf("Error loading Splunk Configuration for Kinesis Firehose: splunk_configuration not found") @@ -1972,6 +1996,8 @@ func createSplunkConfig(d *schema.ResourceData, s3Config *firehose.S3Destination splunk := sl[0].(map[string]interface{}) + s3Config := createS3Config(d, true) + configuration := &firehose.SplunkDestinationConfiguration{ HECToken: aws.String(splunk["hec_token"].(string)), HECEndpointType: aws.String(splunk["hec_endpoint_type"].(string)), @@ -1995,7 +2021,7 @@ func createSplunkConfig(d *schema.ResourceData, s3Config *firehose.S3Destination return configuration, nil } -func updateSplunkConfig(d *schema.ResourceData, s3Update *firehose.S3DestinationUpdate) (*firehose.SplunkDestinationUpdate, error) { +func updateSplunkConfig(d *schema.ResourceData) (*firehose.SplunkDestinationUpdate, error) { splunkRaw, ok := d.GetOk("splunk_configuration") if !ok { return nil, fmt.Errorf("Error loading Splunk Configuration for Kinesis Firehose: splunk_configuration not found") @@ -2004,6 +2030,8 @@ func updateSplunkConfig(d *schema.ResourceData, s3Update *firehose.S3Destination splunk := sl[0].(map[string]interface{}) + s3Update := updateS3Config(d, true) + configuration := &firehose.SplunkDestinationUpdate{ HECToken: aws.String(splunk["hec_token"].(string)), HECEndpointType: aws.String(splunk["hec_endpoint_type"].(string)), @@ -2107,33 +2135,31 @@ func resourceAwsKinesisFirehoseDeliveryStreamCreate(d *schema.ResourceData, meta createInput.DeliveryStreamType = aws.String(firehose.DeliveryStreamTypeDirectPut) } - if d.Get("destination").(string) == "extended_s3" { + switch destination := d.Get("destination").(string); destination { + case "extended_s3": extendedS3Config := createExtendedS3Config(d) createInput.ExtendedS3DestinationConfiguration = extendedS3Config - } else { - s3Config := createS3Config(d) - - if d.Get("destination").(string) == "s3" { - createInput.S3DestinationConfiguration = s3Config - } else if d.Get("destination").(string) == "elasticsearch" { - esConfig, err := createElasticsearchConfig(d, s3Config) - if err != nil { - return err - } - createInput.ElasticsearchDestinationConfiguration = esConfig - } else if d.Get("destination").(string) == "redshift" { - rc, err := createRedshiftConfig(d, s3Config) - if err != nil { - return err - } - createInput.RedshiftDestinationConfiguration = rc - } else if d.Get("destination").(string) == "splunk" { - rc, err := createSplunkConfig(d, s3Config) - if err != nil { - return err - } - createInput.SplunkDestinationConfiguration = rc + case "s3": + s3Config := createS3Config(d, true) + createInput.S3DestinationConfiguration = s3Config + case "elasticsearch": + esConfig, err := createElasticsearchConfig(d) + if err != nil { + return err + } + createInput.ElasticsearchDestinationConfiguration = esConfig + case "redshift": + rc, err := createRedshiftConfig(d) + if err != nil { + return err } + createInput.RedshiftDestinationConfiguration = rc + case "splunk": + rc, err := createSplunkConfig(d) + if err != nil { + return err + } + createInput.SplunkDestinationConfiguration = rc } if v, ok := d.GetOk("tags"); ok { @@ -2243,33 +2269,31 @@ func resourceAwsKinesisFirehoseDeliveryStreamUpdate(d *schema.ResourceData, meta DestinationId: aws.String(d.Get("destination_id").(string)), } - if d.Get("destination").(string) == "extended_s3" { + switch destination := d.Get("destination").(string); destination { + case "extended_s3": extendedS3Config := updateExtendedS3Config(d) updateInput.ExtendedS3DestinationUpdate = extendedS3Config - } else { - s3Config := updateS3Config(d) - - if d.Get("destination").(string) == "s3" { - updateInput.S3DestinationUpdate = s3Config - } else if d.Get("destination").(string) == "elasticsearch" { - esUpdate, err := updateElasticsearchConfig(d, s3Config) - if err != nil { - return err - } - updateInput.ElasticsearchDestinationUpdate = esUpdate - } else if d.Get("destination").(string) == "redshift" { - rc, err := updateRedshiftConfig(d, s3Config) - if err != nil { - return err - } - updateInput.RedshiftDestinationUpdate = rc - } else if d.Get("destination").(string) == "splunk" { - rc, err := updateSplunkConfig(d, s3Config) - if err != nil { - return err - } - updateInput.SplunkDestinationUpdate = rc + case "s3": + s3Config := updateS3Config(d, true) + updateInput.S3DestinationUpdate = s3Config + case "elasticsearch": + esUpdate, err := updateElasticsearchConfig(d) + if err != nil { + return err + } + updateInput.ElasticsearchDestinationUpdate = esUpdate + case "redshift": + rc, err := updateRedshiftConfig(d) + if err != nil { + return err + } + updateInput.RedshiftDestinationUpdate = rc + case "splunk": + rc, err := updateSplunkConfig(d) + if err != nil { + return err } + updateInput.SplunkDestinationUpdate = rc } err := resource.Retry(1*time.Minute, func() *resource.RetryError { diff --git a/aws/resource_aws_kinesis_firehose_delivery_stream_test.go b/aws/resource_aws_kinesis_firehose_delivery_stream_test.go index 96e391b48e8..3fd4ace6872 100644 --- a/aws/resource_aws_kinesis_firehose_delivery_stream_test.go +++ b/aws/resource_aws_kinesis_firehose_delivery_stream_test.go @@ -239,6 +239,61 @@ func TestAccAWSKinesisFirehoseDeliveryStream_s3basicWithTags(t *testing.T) { }) } +func TestAccAWSKinesisFirehoseDeliveryStream_s3basicWithPrefixes(t *testing.T) { + var stream firehose.DeliveryStreamDescription + rInt := acctest.RandInt() + rName := fmt.Sprintf("terraform-kinesis-firehose-basictest-%d", rInt) + resourceName := "aws_kinesis_firehose_delivery_stream.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckKinesisFirehoseDeliveryStreamDestroy, + Steps: []resource.TestStep{ + { + Config: testAccKinesisFirehoseDeliveryStreamConfig_s3basicWithPrefixes(rName, rInt, "logs/", ""), + Check: resource.ComposeTestCheckFunc( + testAccCheckKinesisFirehoseDeliveryStreamExists(resourceName, &stream), + testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream, nil, nil, nil, nil, nil), + resource.TestCheckResourceAttr(resourceName, "s3_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "s3_configuration.0.prefix", "logs/"), + resource.TestCheckResourceAttr(resourceName, "s3_configuration.0.error_output_prefix", ""), + ), + }, + { + Config: testAccKinesisFirehoseDeliveryStreamConfig_s3basicWithPrefixes(rName, rInt, "logs/", "errors/"), + Check: resource.ComposeTestCheckFunc( + testAccCheckKinesisFirehoseDeliveryStreamExists(resourceName, &stream), + testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream, nil, nil, nil, nil, nil), + resource.TestCheckResourceAttr(resourceName, "s3_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "s3_configuration.0.prefix", "logs/"), + resource.TestCheckResourceAttr(resourceName, "s3_configuration.0.error_output_prefix", "errors/"), + ), + }, + { + Config: testAccKinesisFirehoseDeliveryStreamConfig_s3basicWithPrefixes(rName, rInt, "logs/{timestamp:yyyy-MM-dd}/", "errors/{timestamp:yyyy-MM-dd}/"), + Check: resource.ComposeTestCheckFunc( + testAccCheckKinesisFirehoseDeliveryStreamExists(resourceName, &stream), + testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream, nil, nil, nil, nil, nil), + resource.TestCheckResourceAttr(resourceName, "s3_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "s3_configuration.0.prefix", "logs/{timestamp:yyyy-MM-dd}/"), + resource.TestCheckResourceAttr(resourceName, "s3_configuration.0.error_output_prefix", "errors/{timestamp:yyyy-MM-dd}/"), + ), + }, + { + Config: testAccKinesisFirehoseDeliveryStreamConfig_s3basicWithPrefixes(rName, rInt, "", ""), + Check: resource.ComposeTestCheckFunc( + testAccCheckKinesisFirehoseDeliveryStreamExists(resourceName, &stream), + testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream, nil, nil, nil, nil, nil), + resource.TestCheckResourceAttr(resourceName, "s3_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "s3_configuration.0.prefix", ""), + resource.TestCheckResourceAttr(resourceName, "s3_configuration.0.error_output_prefix", ""), + ), + }, + }, + }) +} + func TestAccAWSKinesisFirehoseDeliveryStream_s3KinesisStreamSource(t *testing.T) { var stream firehose.DeliveryStreamDescription ri := acctest.RandInt() @@ -705,6 +760,14 @@ func TestAccAWSKinesisFirehoseDeliveryStream_ExtendedS3_ErrorOutputPrefix(t *tes resource.TestCheckResourceAttr(resourceName, "extended_s3_configuration.0.error_output_prefix", "prefix2"), ), }, + { + Config: testAccKinesisFirehoseDeliveryStreamConfig_ExtendedS3_ErrorOutputPrefix(rName, rInt, ""), + Check: resource.ComposeTestCheckFunc( + testAccCheckKinesisFirehoseDeliveryStreamExists(resourceName, &stream), + resource.TestCheckResourceAttr(resourceName, "extended_s3_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "extended_s3_configuration.0.error_output_prefix", ""), + ), + }, }, }) } @@ -1600,6 +1663,23 @@ func testAccKinesisFirehoseDeliveryStreamConfig_s3basicWithTags(rName string, rI `, rName) } +func testAccKinesisFirehoseDeliveryStreamConfig_s3basicWithPrefixes(rName string, rInt int, prefix string, errorOutputPrefix string) string { + return fmt.Sprintf(testAccKinesisFirehoseDeliveryStreamBaseConfig, rInt, rInt, rInt) + + fmt.Sprintf(` + resource "aws_kinesis_firehose_delivery_stream" "test" { + depends_on = ["aws_iam_role_policy.firehose"] + name = "%s" + destination = "s3" + s3_configuration { + role_arn = "${aws_iam_role.firehose.arn}" + bucket_arn = "${aws_s3_bucket.bucket.arn}" + prefix = "%s" + error_output_prefix = "%s" + } + } +`, rName, prefix, errorOutputPrefix) +} + func testAccKinesisFirehoseDeliveryStreamConfig_s3basicWithTagsChanged(rName string, rInt int) string { return fmt.Sprintf(testAccKinesisFirehoseDeliveryStreamBaseConfig, rInt, rInt, rInt) + fmt.Sprintf(` diff --git a/website/docs/r/kinesis_firehose_delivery_stream.html.markdown b/website/docs/r/kinesis_firehose_delivery_stream.html.markdown index 6fc6e7ef88d..f0029e7480b 100644 --- a/website/docs/r/kinesis_firehose_delivery_stream.html.markdown +++ b/website/docs/r/kinesis_firehose_delivery_stream.html.markdown @@ -278,6 +278,7 @@ The `s3_configuration` object supports the following: * `role_arn` - (Required) The ARN of the AWS credentials. * `bucket_arn` - (Required) The ARN of the S3 bucket * `prefix` - (Optional) The "YYYY/MM/DD/HH" time format prefix is automatically used for delivered S3 files. You can specify an extra prefix to be added in front of the time format prefix. Note that if the prefix ends with a slash, it appears as a folder in the S3 bucket +* `error_output_prefix` - (Optional) Prefix added to failed records before writing them to S3. This prefix appears immediately following the bucket name. * `buffer_size` - (Optional) Buffer incoming data to the specified size, in MBs, before delivering it to the destination. The default value is 5. We recommend setting SizeInMBs to a value greater than the amount of data you typically ingest into the delivery stream in 10 seconds. For example, if you typically ingest data at 1 MB/sec set SizeInMBs to be 10 MB or higher. * `buffer_interval` - (Optional) Buffer incoming data for the specified period of time, in seconds, before delivering it to the destination. The default value is 300. @@ -289,7 +290,6 @@ be used. The `extended_s3_configuration` object supports the same fields from `s3_configuration` as well as the following: * `data_format_conversion_configuration` - (Optional) Nested argument for the serializer, deserializer, and schema for converting data from the JSON format to the Parquet or ORC format before writing it to Amazon S3. More details given below. -* `error_output_prefix` - (Optional) Prefix added to failed records before writing them to S3. This prefix appears immediately following the bucket name. * `processing_configuration` - (Optional) The data processing configuration. More details are given below. * `s3_backup_mode` - (Optional) The Amazon S3 backup mode. Valid values are `Disabled` and `Enabled`. Default value is `Disabled`. * `s3_backup_configuration` - (Optional) The configuration for backup in Amazon S3. Required if `s3_backup_mode` is `Enabled`. Supports the same fields as `s3_configuration` object. From 6f170536754de02a5130fd0b4319056742d85e13 Mon Sep 17 00:00:00 2001 From: Angie Pinilla Date: Mon, 13 Dec 2021 17:56:07 -0500 Subject: [PATCH 2/3] terrafmt fix --- internal/service/firehose/delivery_stream_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/service/firehose/delivery_stream_test.go b/internal/service/firehose/delivery_stream_test.go index 2ddcfc7d9f8..70ff80caaf7 100644 --- a/internal/service/firehose/delivery_stream_test.go +++ b/internal/service/firehose/delivery_stream_test.go @@ -146,7 +146,7 @@ func TestAccFirehoseDeliveryStream_s3basicWithPrefixes(t *testing.T) { ), }, { - Config: testAccDeliveryStreamConfig_s3basicWithPrefixes(rName,"", ""), + Config: testAccDeliveryStreamConfig_s3basicWithPrefixes(rName, "", ""), Check: resource.ComposeTestCheckFunc( testAccCheckDeliveryStreamExists(resourceName, &stream), testAccCheckDeliveryStreamAttributes(&stream, nil, nil, nil, nil, nil, nil), @@ -1978,8 +1978,8 @@ func testAccDeliveryStreamConfig_s3basicWithPrefixes(rName, prefix, errorOutputP testAccDeliveryStreamBaseConfig(rName), fmt.Sprintf(` resource "aws_kinesis_firehose_delivery_stream" "test" { - depends_on = ["aws_iam_role_policy.firehose"] - name = %[1]q + depends_on = ["aws_iam_role_policy.firehose"] + name = %[1]q destination = "s3" s3_configuration { From b2c99bda196bf54767f56f895ecda1bd02b4921a Mon Sep 17 00:00:00 2001 From: Angie Pinilla Date: Thu, 23 Dec 2021 16:30:35 -0500 Subject: [PATCH 3/3] CR updates --- .changelog/11229.txt | 11 + internal/service/firehose/delivery_stream.go | 203 ++++++----- .../service/firehose/delivery_stream_test.go | 317 +++++++++++++++++- ...sis_firehose_delivery_stream.html.markdown | 6 +- 4 files changed, 425 insertions(+), 112 deletions(-) create mode 100644 .changelog/11229.txt diff --git a/.changelog/11229.txt b/.changelog/11229.txt new file mode 100644 index 00000000000..f410d339fb1 --- /dev/null +++ b/.changelog/11229.txt @@ -0,0 +1,11 @@ +```release-note:enhancement +resource/aws_kinesis_firehose_delivery_stream: Add `error_output_prefix` argument to `s3_configuration` configuration block +``` + +```release-note:enhancement +resource/aws_kinesis_firehose_delivery_stream: Add `error_output_prefix` argument to `redshift_configuration` `s3_backup_configuration` configuration block +``` + +```release-note:enhancement +resource/aws_kinesis_firehose_delivery_stream: Add `error_output_prefix` argument to `extended_s3_configuration` `s3_backup_configuration` configuration block +``` diff --git a/internal/service/firehose/delivery_stream.go b/internal/service/firehose/delivery_stream.go index c0841d0ab1e..3cf81e38d0c 100644 --- a/internal/service/firehose/delivery_stream.go +++ b/internal/service/firehose/delivery_stream.go @@ -152,8 +152,9 @@ func s3ConfigurationSchema() *schema.Schema { }, "error_output_prefix": { - Type: schema.TypeString, - Optional: true, + Type: schema.TypeString, + Optional: true, + ValidateFunc: validation.StringLenBetween(0, 1024), }, "kms_key_arn": { @@ -1231,8 +1232,9 @@ func ResourceDeliveryStream() *schema.Resource { }, "error_output_prefix": { - Type: schema.TypeString, - Optional: true, + Type: schema.TypeString, + Optional: true, + ValidateFunc: validation.StringLenBetween(0, 1024), }, "kms_key_arn": { @@ -1608,7 +1610,7 @@ func createSourceConfig(source map[string]interface{}) *firehose.KinesisStreamSo return configuration } -func createS3Config(d *schema.ResourceData, withErrorOutputPrefix bool) *firehose.S3DestinationConfiguration { +func createS3Config(d *schema.ResourceData) *firehose.S3DestinationConfiguration { s3 := d.Get("s3_configuration").([]interface{})[0].(map[string]interface{}) configuration := &firehose.S3DestinationConfiguration{ @@ -1623,8 +1625,8 @@ func createS3Config(d *schema.ResourceData, withErrorOutputPrefix bool) *firehos EncryptionConfiguration: extractEncryptionConfiguration(s3), } - if withErrorOutputPrefix { - configuration.ErrorOutputPrefix = extractErrorOutputPrefixConfiguration(s3) + if v, ok := s3["error_output_prefix"].(string); ok && v != "" { + configuration.ErrorOutputPrefix = aws.String(v) } if _, ok := s3["cloudwatch_logging_options"]; ok { @@ -1658,6 +1660,10 @@ func expandS3BackupConfig(d map[string]interface{}) *firehose.S3DestinationConfi configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(s3) } + if v, ok := s3["error_output_prefix"].(string); ok && v != "" { + configuration.ErrorOutputPrefix = aws.String(v) + } + return configuration } @@ -1671,7 +1677,6 @@ func createExtendedS3Config(d *schema.ResourceData) *firehose.ExtendedS3Destinat IntervalInSeconds: aws.Int64(int64(s3["buffer_interval"].(int))), SizeInMBs: aws.Int64(int64(s3["buffer_size"].(int))), }, - ErrorOutputPrefix: extractErrorOutputPrefixConfiguration(s3), Prefix: extractPrefixConfiguration(s3), CompressionFormat: aws.String(s3["compression_format"].(string)), DataFormatConversionConfiguration: expandDataFormatConversionConfiguration(s3["data_format_conversion_configuration"].([]interface{})), @@ -1690,6 +1695,10 @@ func createExtendedS3Config(d *schema.ResourceData) *firehose.ExtendedS3Destinat configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(s3) } + if v, ok := s3["error_output_prefix"].(string); ok && v != "" { + configuration.ErrorOutputPrefix = aws.String(v) + } + if s3BackupMode, ok := s3["s3_backup_mode"]; ok { configuration.S3BackupMode = aws.String(s3BackupMode.(string)) configuration.S3BackupConfiguration = expandS3BackupConfig(d.Get("extended_s3_configuration").([]interface{})[0].(map[string]interface{})) @@ -1698,7 +1707,7 @@ func createExtendedS3Config(d *schema.ResourceData) *firehose.ExtendedS3Destinat return configuration } -func updateS3Config(d *schema.ResourceData, withErrorOutputPrefix bool) *firehose.S3DestinationUpdate { +func updateS3Config(d *schema.ResourceData) *firehose.S3DestinationUpdate { s3 := d.Get("s3_configuration").([]interface{})[0].(map[string]interface{}) configuration := &firehose.S3DestinationUpdate{ @@ -1708,16 +1717,13 @@ func updateS3Config(d *schema.ResourceData, withErrorOutputPrefix bool) *firehos IntervalInSeconds: aws.Int64((int64)(s3["buffer_interval"].(int))), SizeInMBs: aws.Int64((int64)(s3["buffer_size"].(int))), }, + ErrorOutputPrefix: aws.String(s3["error_output_prefix"].(string)), Prefix: extractPrefixConfiguration(s3), CompressionFormat: aws.String(s3["compression_format"].(string)), EncryptionConfiguration: extractEncryptionConfiguration(s3), CloudWatchLoggingOptions: extractCloudWatchLoggingConfiguration(s3), } - if withErrorOutputPrefix { - configuration.ErrorOutputPrefix = extractErrorOutputPrefixConfiguration(s3) - } - if _, ok := s3["cloudwatch_logging_options"]; ok { configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(s3) } @@ -1740,6 +1746,7 @@ func updateS3BackupConfig(d map[string]interface{}) *firehose.S3DestinationUpdat IntervalInSeconds: aws.Int64((int64)(s3["buffer_interval"].(int))), SizeInMBs: aws.Int64((int64)(s3["buffer_size"].(int))), }, + ErrorOutputPrefix: aws.String(s3["error_output_prefix"].(string)), Prefix: extractPrefixConfiguration(s3), CompressionFormat: aws.String(s3["compression_format"].(string)), EncryptionConfiguration: extractEncryptionConfiguration(s3), @@ -1763,7 +1770,7 @@ func updateExtendedS3Config(d *schema.ResourceData) *firehose.ExtendedS3Destinat IntervalInSeconds: aws.Int64((int64)(s3["buffer_interval"].(int))), SizeInMBs: aws.Int64((int64)(s3["buffer_size"].(int))), }, - ErrorOutputPrefix: extractErrorOutputPrefixConfiguration(s3), + ErrorOutputPrefix: aws.String(s3["error_output_prefix"].(string)), Prefix: extractPrefixConfiguration(s3), CompressionFormat: aws.String(s3["compression_format"].(string)), EncryptionConfiguration: extractEncryptionConfiguration(s3), @@ -2086,14 +2093,6 @@ func extractCloudWatchLoggingConfiguration(s3 map[string]interface{}) *firehose. } -func extractErrorOutputPrefixConfiguration(s3 map[string]interface{}) *string { - if v, ok := s3["error_output_prefix"]; ok { - return aws.String(v.(string)) - } - - return nil -} - func extractVPCConfiguration(es map[string]interface{}) *firehose.VpcConfiguration { config := es["vpc_config"].([]interface{}) if len(config) == 0 { @@ -2117,7 +2116,7 @@ func extractPrefixConfiguration(s3 map[string]interface{}) *string { return nil } -func createRedshiftConfig(d *schema.ResourceData) (*firehose.RedshiftDestinationConfiguration, error) { +func createRedshiftConfig(d *schema.ResourceData, s3Config *firehose.S3DestinationConfiguration) (*firehose.RedshiftDestinationConfiguration, error) { redshiftRaw, ok := d.GetOk("redshift_configuration") if !ok { return nil, fmt.Errorf("Error loading Redshift Configuration for Kinesis Firehose: redshift_configuration not found") @@ -2126,8 +2125,6 @@ func createRedshiftConfig(d *schema.ResourceData) (*firehose.RedshiftDestination redshift := rl[0].(map[string]interface{}) - s3Config := createS3Config(d, false) - configuration := &firehose.RedshiftDestinationConfiguration{ ClusterJDBCURL: aws.String(redshift["cluster_jdbcurl"].(string)), RetryOptions: extractRedshiftRetryOptions(redshift), @@ -2152,7 +2149,7 @@ func createRedshiftConfig(d *schema.ResourceData) (*firehose.RedshiftDestination return configuration, nil } -func updateRedshiftConfig(d *schema.ResourceData) (*firehose.RedshiftDestinationUpdate, error) { +func updateRedshiftConfig(d *schema.ResourceData, s3Update *firehose.S3DestinationUpdate) (*firehose.RedshiftDestinationUpdate, error) { redshiftRaw, ok := d.GetOk("redshift_configuration") if !ok { return nil, fmt.Errorf("Error loading Redshift Configuration for Kinesis Firehose: redshift_configuration not found") @@ -2161,8 +2158,6 @@ func updateRedshiftConfig(d *schema.ResourceData) (*firehose.RedshiftDestination redshift := rl[0].(map[string]interface{}) - s3Update := updateS3Config(d, false) - configuration := &firehose.RedshiftDestinationUpdate{ ClusterJDBCURL: aws.String(redshift["cluster_jdbcurl"].(string)), RetryOptions: extractRedshiftRetryOptions(redshift), @@ -2182,12 +2177,18 @@ func updateRedshiftConfig(d *schema.ResourceData) (*firehose.RedshiftDestination if s3BackupMode, ok := redshift["s3_backup_mode"]; ok { configuration.S3BackupMode = aws.String(s3BackupMode.(string)) configuration.S3BackupUpdate = updateS3BackupConfig(d.Get("redshift_configuration").([]interface{})[0].(map[string]interface{})) + if configuration.S3BackupUpdate != nil { + // Redshift does not currently support ErrorOutputPrefix, + // which is set to the empty string within "updateS3BackupConfig", + // thus we must remove it here to avoid an InvalidArgumentException. + configuration.S3BackupUpdate.ErrorOutputPrefix = nil + } } return configuration, nil } -func createElasticsearchConfig(d *schema.ResourceData) (*firehose.ElasticsearchDestinationConfiguration, error) { +func createElasticsearchConfig(d *schema.ResourceData, s3Config *firehose.S3DestinationConfiguration) (*firehose.ElasticsearchDestinationConfiguration, error) { esConfig, ok := d.GetOk("elasticsearch_configuration") if !ok { return nil, fmt.Errorf("Error loading Elasticsearch Configuration for Kinesis Firehose: elasticsearch_configuration not found") @@ -2196,8 +2197,6 @@ func createElasticsearchConfig(d *schema.ResourceData) (*firehose.ElasticsearchD es := esList[0].(map[string]interface{}) - s3Config := createS3Config(d, true) - config := &firehose.ElasticsearchDestinationConfiguration{ BufferingHints: extractBufferingHints(es), IndexName: aws.String(es["index_name"].(string)), @@ -2237,7 +2236,7 @@ func createElasticsearchConfig(d *schema.ResourceData) (*firehose.ElasticsearchD return config, nil } -func updateElasticsearchConfig(d *schema.ResourceData) (*firehose.ElasticsearchDestinationUpdate, error) { +func updateElasticsearchConfig(d *schema.ResourceData, s3Update *firehose.S3DestinationUpdate) (*firehose.ElasticsearchDestinationUpdate, error) { esConfig, ok := d.GetOk("elasticsearch_configuration") if !ok { return nil, fmt.Errorf("Error loading Elasticsearch Configuration for Kinesis Firehose: elasticsearch_configuration not found") @@ -2246,8 +2245,6 @@ func updateElasticsearchConfig(d *schema.ResourceData) (*firehose.ElasticsearchD es := esList[0].(map[string]interface{}) - s3Update := updateS3Config(d, true) - update := &firehose.ElasticsearchDestinationUpdate{ BufferingHints: extractBufferingHints(es), IndexName: aws.String(es["index_name"].(string)), @@ -2280,7 +2277,7 @@ func updateElasticsearchConfig(d *schema.ResourceData) (*firehose.ElasticsearchD return update, nil } -func createSplunkConfig(d *schema.ResourceData) (*firehose.SplunkDestinationConfiguration, error) { +func createSplunkConfig(d *schema.ResourceData, s3Config *firehose.S3DestinationConfiguration) (*firehose.SplunkDestinationConfiguration, error) { splunkRaw, ok := d.GetOk("splunk_configuration") if !ok { return nil, fmt.Errorf("Error loading Splunk Configuration for Kinesis Firehose: splunk_configuration not found") @@ -2289,8 +2286,6 @@ func createSplunkConfig(d *schema.ResourceData) (*firehose.SplunkDestinationConf splunk := sl[0].(map[string]interface{}) - s3Config := createS3Config(d, true) - configuration := &firehose.SplunkDestinationConfiguration{ HECToken: aws.String(splunk["hec_token"].(string)), HECEndpointType: aws.String(splunk["hec_endpoint_type"].(string)), @@ -2314,7 +2309,7 @@ func createSplunkConfig(d *schema.ResourceData) (*firehose.SplunkDestinationConf return configuration, nil } -func updateSplunkConfig(d *schema.ResourceData) (*firehose.SplunkDestinationUpdate, error) { +func updateSplunkConfig(d *schema.ResourceData, s3Update *firehose.S3DestinationUpdate) (*firehose.SplunkDestinationUpdate, error) { splunkRaw, ok := d.GetOk("splunk_configuration") if !ok { return nil, fmt.Errorf("Error loading Splunk Configuration for Kinesis Firehose: splunk_configuration not found") @@ -2323,8 +2318,6 @@ func updateSplunkConfig(d *schema.ResourceData) (*firehose.SplunkDestinationUpda splunk := sl[0].(map[string]interface{}) - s3Update := updateS3Config(d, true) - configuration := &firehose.SplunkDestinationUpdate{ HECToken: aws.String(splunk["hec_token"].(string)), HECEndpointType: aws.String(splunk["hec_endpoint_type"].(string)), @@ -2348,7 +2341,7 @@ func updateSplunkConfig(d *schema.ResourceData) (*firehose.SplunkDestinationUpda return configuration, nil } -func createHTTPEndpointConfig(d *schema.ResourceData) (*firehose.HttpEndpointDestinationConfiguration, error) { +func createHTTPEndpointConfig(d *schema.ResourceData, s3Config *firehose.S3DestinationConfiguration) (*firehose.HttpEndpointDestinationConfiguration, error) { HttpEndpointRaw, ok := d.GetOk("http_endpoint_configuration") if !ok { return nil, fmt.Errorf("Error loading HTTP Endpoint Configuration for Kinesis Firehose: http_endpoint_configuration not found") @@ -2357,8 +2350,6 @@ func createHTTPEndpointConfig(d *schema.ResourceData) (*firehose.HttpEndpointDes HttpEndpoint := sl[0].(map[string]interface{}) - s3Config := createS3Config(d, true) - configuration := &firehose.HttpEndpointDestinationConfiguration{ RetryOptions: extractHTTPEndpointRetryOptions(HttpEndpoint), RoleARN: aws.String(HttpEndpoint["role_arn"].(string)), @@ -2395,7 +2386,7 @@ func createHTTPEndpointConfig(d *schema.ResourceData) (*firehose.HttpEndpointDes return configuration, nil } -func updateHTTPEndpointConfig(d *schema.ResourceData) (*firehose.HttpEndpointDestinationUpdate, error) { +func updateHTTPEndpointConfig(d *schema.ResourceData, s3Update *firehose.S3DestinationUpdate) (*firehose.HttpEndpointDestinationUpdate, error) { HttpEndpointRaw, ok := d.GetOk("http_endpoint_configuration") if !ok { return nil, fmt.Errorf("Error loading HTTP Endpoint Configuration for Kinesis Firehose: http_endpoint_configuration not found") @@ -2404,8 +2395,6 @@ func updateHTTPEndpointConfig(d *schema.ResourceData) (*firehose.HttpEndpointDes HttpEndpoint := sl[0].(map[string]interface{}) - s3Update := updateS3Config(d, true) - configuration := &firehose.HttpEndpointDestinationUpdate{ RetryOptions: extractHTTPEndpointRetryOptions(HttpEndpoint), RoleARN: aws.String(HttpEndpoint["role_arn"].(string)), @@ -2587,37 +2576,39 @@ func resourceDeliveryStreamCreate(d *schema.ResourceData, meta interface{}) erro createInput.DeliveryStreamType = aws.String(firehose.DeliveryStreamTypeDirectPut) } - switch destination := d.Get("destination").(string); destination { - case "extended_s3": + if d.Get("destination").(string) == destinationTypeExtendedS3 { extendedS3Config := createExtendedS3Config(d) createInput.ExtendedS3DestinationConfiguration = extendedS3Config - case "s3": - s3Config := createS3Config(d, true) - createInput.S3DestinationConfiguration = s3Config - case "elasticsearch": - esConfig, err := createElasticsearchConfig(d) - if err != nil { - return err - } - createInput.ElasticsearchDestinationConfiguration = esConfig - case "redshift": - rc, err := createRedshiftConfig(d) - if err != nil { - return err - } - createInput.RedshiftDestinationConfiguration = rc - case "splunk": - rc, err := createSplunkConfig(d) - if err != nil { - return err - } - createInput.SplunkDestinationConfiguration = rc - case "http_endpoint": - rc, err := createHTTPEndpointConfig(d) - if err != nil { - return err + } else { + s3Config := createS3Config(d) + + if d.Get("destination").(string) == destinationTypeS3 { + createInput.S3DestinationConfiguration = s3Config + } else if d.Get("destination").(string) == destinationTypeElasticsearch { + esConfig, err := createElasticsearchConfig(d, s3Config) + if err != nil { + return err + } + createInput.ElasticsearchDestinationConfiguration = esConfig + } else if d.Get("destination").(string) == destinationTypeRedshift { + rc, err := createRedshiftConfig(d, s3Config) + if err != nil { + return err + } + createInput.RedshiftDestinationConfiguration = rc + } else if d.Get("destination").(string) == destinationTypeSplunk { + rc, err := createSplunkConfig(d, s3Config) + if err != nil { + return err + } + createInput.SplunkDestinationConfiguration = rc + } else if d.Get("destination").(string) == destinationTypeHttpEndpoint { + rc, err := createHTTPEndpointConfig(d, s3Config) + if err != nil { + return err + } + createInput.HttpEndpointDestinationConfiguration = rc } - createInput.HttpEndpointDestinationConfiguration = rc } if len(tags) > 0 { @@ -2737,37 +2728,45 @@ func resourceDeliveryStreamUpdate(d *schema.ResourceData, meta interface{}) erro DestinationId: aws.String(d.Get("destination_id").(string)), } - switch destination := d.Get("destination").(string); destination { - case "extended_s3": + if d.Get("destination").(string) == destinationTypeExtendedS3 { extendedS3Config := updateExtendedS3Config(d) updateInput.ExtendedS3DestinationUpdate = extendedS3Config - case "s3": - s3Config := updateS3Config(d, true) - updateInput.S3DestinationUpdate = s3Config - case "elasticsearch": - esUpdate, err := updateElasticsearchConfig(d) - if err != nil { - return err - } - updateInput.ElasticsearchDestinationUpdate = esUpdate - case "redshift": - rc, err := updateRedshiftConfig(d) - if err != nil { - return err - } - updateInput.RedshiftDestinationUpdate = rc - case "splunk": - rc, err := updateSplunkConfig(d) - if err != nil { - return err - } - updateInput.SplunkDestinationUpdate = rc - case "http_endpoint": - rc, err := updateHTTPEndpointConfig(d) - if err != nil { - return err + } else { + s3Config := updateS3Config(d) + + if d.Get("destination").(string) == destinationTypeS3 { + updateInput.S3DestinationUpdate = s3Config + } else if d.Get("destination").(string) == destinationTypeElasticsearch { + esUpdate, err := updateElasticsearchConfig(d, s3Config) + if err != nil { + return err + } + updateInput.ElasticsearchDestinationUpdate = esUpdate + } else if d.Get("destination").(string) == destinationTypeRedshift { + // Redshift does not currently support ErrorOutputPrefix, + // which is set to the empty string within "updateS3Config", + // thus we must remove it here to avoid an InvalidArgumentException. + if s3Config != nil { + s3Config.ErrorOutputPrefix = nil + } + rc, err := updateRedshiftConfig(d, s3Config) + if err != nil { + return err + } + updateInput.RedshiftDestinationUpdate = rc + } else if d.Get("destination").(string) == destinationTypeSplunk { + rc, err := updateSplunkConfig(d, s3Config) + if err != nil { + return err + } + updateInput.SplunkDestinationUpdate = rc + } else if d.Get("destination").(string) == destinationTypeHttpEndpoint { + rc, err := updateHTTPEndpointConfig(d, s3Config) + if err != nil { + return err + } + updateInput.HttpEndpointDestinationUpdate = rc } - updateInput.HttpEndpointDestinationUpdate = rc } err := resource.Retry(tfiam.PropagationTimeout, func() *resource.RetryError { diff --git a/internal/service/firehose/delivery_stream_test.go b/internal/service/firehose/delivery_stream_test.go index 70ff80caaf7..c4c92583aef 100644 --- a/internal/service/firehose/delivery_stream_test.go +++ b/internal/service/firehose/delivery_stream_test.go @@ -356,14 +356,6 @@ func TestAccFirehoseDeliveryStream_s3KinesisStreamSource(t *testing.T) { testAccCheckDeliveryStreamAttributes(&stream, nil, nil, nil, nil, nil, nil), ), }, - { - Config: testAccDeliveryStreamConfig_ExtendedS3_ErrorOutputPrefix(rName, ""), - Check: resource.ComposeTestCheckFunc( - testAccCheckDeliveryStreamExists(resourceName, &stream), - resource.TestCheckResourceAttr(resourceName, "extended_s3_configuration.#", "1"), - resource.TestCheckResourceAttr(resourceName, "extended_s3_configuration.0.error_output_prefix", ""), - ), - }, }, }) } @@ -801,6 +793,71 @@ func TestAccFirehoseDeliveryStream_ExtendedS3_errorOutputPrefix(t *testing.T) { resource.TestCheckResourceAttr(resourceName, "extended_s3_configuration.0.error_output_prefix", "prefix2"), ), }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + }, + { + // Ensure the ErrorOutputPrefix can be updated to an empty value + // Reference: https://github.com/hashicorp/terraform-provider-aws/pull/11229#discussion_r356282765 + Config: testAccDeliveryStreamConfig_ExtendedS3_ErrorOutputPrefix(rName, ""), + Check: resource.ComposeTestCheckFunc( + testAccCheckDeliveryStreamExists(resourceName, &stream), + resource.TestCheckResourceAttr(resourceName, "extended_s3_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "extended_s3_configuration.0.error_output_prefix", ""), + ), + }, + }, + }) +} + +func TestAccFirehoseDeliveryStream_ExtendedS3_S3BackupConfiguration_ErrorOutputPrefix(t *testing.T) { + var stream firehose.DeliveryStreamDescription + rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) + resourceName := "aws_kinesis_firehose_delivery_stream.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acctest.PreCheck(t) }, + ErrorCheck: acctest.ErrorCheck(t, firehose.EndpointsID), + Providers: acctest.Providers, + CheckDestroy: testAccCheckDeliveryStreamDestroy_ExtendedS3, + Steps: []resource.TestStep{ + { + Config: testAccDeliveryStreamConfig_ExtendedS3_S3BackUpConfiguration_ErrorOutputPrefix(rName, "prefix1"), + Check: resource.ComposeTestCheckFunc( + testAccCheckDeliveryStreamExists(resourceName, &stream), + resource.TestCheckResourceAttr(resourceName, "extended_s3_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "extended_s3_configuration.0.s3_backup_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "extended_s3_configuration.0.s3_backup_configuration.0.error_output_prefix", "prefix1"), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + }, + { + Config: testAccDeliveryStreamConfig_ExtendedS3_S3BackUpConfiguration_ErrorOutputPrefix(rName, "prefix2"), + Check: resource.ComposeTestCheckFunc( + testAccCheckDeliveryStreamExists(resourceName, &stream), + resource.TestCheckResourceAttr(resourceName, "extended_s3_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "extended_s3_configuration.0.s3_backup_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "extended_s3_configuration.0.s3_backup_configuration.0.error_output_prefix", "prefix2")), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + }, + { + Config: testAccDeliveryStreamConfig_ExtendedS3_S3BackUpConfiguration_ErrorOutputPrefix(rName, ""), + Check: resource.ComposeTestCheckFunc( + testAccCheckDeliveryStreamExists(resourceName, &stream), + resource.TestCheckResourceAttr(resourceName, "extended_s3_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "extended_s3_configuration.0.s3_backup_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "extended_s3_configuration.0.s3_backup_configuration.0.error_output_prefix", "")), + }, }, }) } @@ -1103,6 +1160,55 @@ func TestAccFirehoseDeliveryStream_splunkUpdates(t *testing.T) { }) } +func TestAccFirehoseDeliveryStream_Splunk_ErrorOutputPrefix(t *testing.T) { + var stream firehose.DeliveryStreamDescription + rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) + resourceName := "aws_kinesis_firehose_delivery_stream.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acctest.PreCheck(t) }, + ErrorCheck: acctest.ErrorCheck(t, firehose.EndpointsID), + Providers: acctest.Providers, + CheckDestroy: testAccCheckDeliveryStreamDestroy_ExtendedS3, + Steps: []resource.TestStep{ + { + Config: testAccDeliveryStreamConfig_Splunk_ErrorOutputPrefix(rName, "prefix1"), + Check: resource.ComposeTestCheckFunc( + testAccCheckDeliveryStreamExists(resourceName, &stream), + resource.TestCheckResourceAttr(resourceName, "s3_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "s3_configuration.0.error_output_prefix", "prefix1"), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + }, + { + Config: testAccDeliveryStreamConfig_Splunk_ErrorOutputPrefix(rName, "prefix2"), + Check: resource.ComposeTestCheckFunc( + testAccCheckDeliveryStreamExists(resourceName, &stream), + resource.TestCheckResourceAttr(resourceName, "s3_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "s3_configuration.0.error_output_prefix", "prefix2"), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + }, + { + Config: testAccDeliveryStreamConfig_Splunk_ErrorOutputPrefix(rName, ""), + Check: resource.ComposeTestCheckFunc( + testAccCheckDeliveryStreamExists(resourceName, &stream), + resource.TestCheckResourceAttr(resourceName, "s3_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "s3_configuration.0.error_output_prefix", ""), + ), + }, + }, + }) +} + func TestAccFirehoseDeliveryStream_httpEndpoint(t *testing.T) { var stream firehose.DeliveryStreamDescription rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) @@ -1159,6 +1265,55 @@ func TestAccFirehoseDeliveryStream_httpEndpoint(t *testing.T) { }) } +func TestAccFirehoseDeliveryStream_HTTPEndpoint_ErrorOutputPrefix(t *testing.T) { + var stream firehose.DeliveryStreamDescription + rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) + resourceName := "aws_kinesis_firehose_delivery_stream.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acctest.PreCheck(t) }, + ErrorCheck: acctest.ErrorCheck(t, firehose.EndpointsID), + Providers: acctest.Providers, + CheckDestroy: testAccCheckDeliveryStreamDestroy_ExtendedS3, + Steps: []resource.TestStep{ + { + Config: testAccDeliveryStreamConfig_HTTPEndpoint_ErrorOutputPrefix(rName, "prefix1"), + Check: resource.ComposeTestCheckFunc( + testAccCheckDeliveryStreamExists(resourceName, &stream), + resource.TestCheckResourceAttr(resourceName, "s3_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "s3_configuration.0.error_output_prefix", "prefix1"), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + }, + { + Config: testAccDeliveryStreamConfig_HTTPEndpoint_ErrorOutputPrefix(rName, "prefix2"), + Check: resource.ComposeTestCheckFunc( + testAccCheckDeliveryStreamExists(resourceName, &stream), + resource.TestCheckResourceAttr(resourceName, "s3_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "s3_configuration.0.error_output_prefix", "prefix2"), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + }, + { + Config: testAccDeliveryStreamConfig_HTTPEndpoint_ErrorOutputPrefix(rName, ""), + Check: resource.ComposeTestCheckFunc( + testAccCheckDeliveryStreamExists(resourceName, &stream), + resource.TestCheckResourceAttr(resourceName, "s3_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "s3_configuration.0.error_output_prefix", ""), + ), + }, + }, + }) +} + func TestAccFirehoseDeliveryStream_HTTPEndpoint_retryDuration(t *testing.T) { var stream firehose.DeliveryStreamDescription rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) @@ -1363,6 +1518,55 @@ func TestAccFirehoseDeliveryStream_elasticSearchWithVPCUpdates(t *testing.T) { }) } +func TestAccFirehoseDeliveryStream_Elasticsearch_ErrorOutputPrefix(t *testing.T) { + var stream firehose.DeliveryStreamDescription + rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) + resourceName := "aws_kinesis_firehose_delivery_stream.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acctest.PreCheck(t) }, + ErrorCheck: acctest.ErrorCheck(t, firehose.EndpointsID), + Providers: acctest.Providers, + CheckDestroy: testAccCheckDeliveryStreamDestroy_ExtendedS3, + Steps: []resource.TestStep{ + { + Config: testAccDeliveryStreamConfig_Elasticsearch_ErrorOutputPrefix(rName, "prefix1"), + Check: resource.ComposeTestCheckFunc( + testAccCheckDeliveryStreamExists(resourceName, &stream), + resource.TestCheckResourceAttr(resourceName, "s3_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "s3_configuration.0.error_output_prefix", "prefix1"), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + }, + { + Config: testAccDeliveryStreamConfig_Elasticsearch_ErrorOutputPrefix(rName, "prefix2"), + Check: resource.ComposeTestCheckFunc( + testAccCheckDeliveryStreamExists(resourceName, &stream), + resource.TestCheckResourceAttr(resourceName, "s3_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "s3_configuration.0.error_output_prefix", "prefix2"), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + }, + { + Config: testAccDeliveryStreamConfig_Elasticsearch_ErrorOutputPrefix(rName, ""), + Check: resource.ComposeTestCheckFunc( + testAccCheckDeliveryStreamExists(resourceName, &stream), + resource.TestCheckResourceAttr(resourceName, "s3_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "s3_configuration.0.error_output_prefix", ""), + ), + }, + }, + }) +} + // Regression test for https://github.com/hashicorp/terraform-provider-aws/issues/1657 func TestAccFirehoseDeliveryStream_missingProcessing(t *testing.T) { var stream firehose.DeliveryStreamDescription @@ -2574,6 +2778,30 @@ resource "aws_kinesis_firehose_delivery_stream" "test" { `, rName, errorOutputPrefix)) } +func testAccDeliveryStreamConfig_ExtendedS3_S3BackUpConfiguration_ErrorOutputPrefix(rName, errorOutputPrefix string) string { + return acctest.ConfigCompose( + testAccDeliveryStreamBaseConfig(rName), + fmt.Sprintf(` +resource "aws_kinesis_firehose_delivery_stream" "test" { + destination = "extended_s3" + name = %[1]q + + extended_s3_configuration { + bucket_arn = aws_s3_bucket.bucket.arn + role_arn = aws_iam_role.firehose.arn + s3_backup_mode = "Enabled" + s3_backup_configuration { + role_arn = aws_iam_role.firehose.arn + bucket_arn = aws_s3_bucket.bucket.arn + error_output_prefix = %[2]q + } + } + + depends_on = [aws_iam_role_policy.firehose] +} +`, rName, errorOutputPrefix)) +} + func testAccDeliveryStreamConfig_ExtendedS3_ProcessingConfiguration_Empty(rName string) string { return acctest.ConfigCompose( testAccDeliveryStreamBaseConfig(rName), @@ -2943,6 +3171,29 @@ resource "aws_kinesis_firehose_delivery_stream" "test" { `, rName)) } +func testAccDeliveryStreamConfig_Splunk_ErrorOutputPrefix(rName, errorOutputPrefix string) string { + return acctest.ConfigCompose( + testAccDeliveryStreamBaseConfig(rName), + fmt.Sprintf(` +resource "aws_kinesis_firehose_delivery_stream" "test" { + depends_on = [aws_iam_role_policy.firehose] + name = %[1]q + destination = "splunk" + + s3_configuration { + role_arn = aws_iam_role.firehose.arn + bucket_arn = aws_s3_bucket.bucket.arn + error_output_prefix = %[2]q + } + + splunk_configuration { + hec_endpoint = "https://input-test.com:443" + hec_token = "51D4DA16-C61B-4F5F-8EC7-ED4301342A4A" + } +} +`, rName, errorOutputPrefix)) +} + func testAccDeliveryStreamConfig_HTTPEndpointBasic(rName string) string { return acctest.ConfigCompose( testAccDeliveryStreamBaseConfig(rName), @@ -2966,6 +3217,30 @@ resource "aws_kinesis_firehose_delivery_stream" "test" { `, rName)) } +func testAccDeliveryStreamConfig_HTTPEndpoint_ErrorOutputPrefix(rName, errorOutputPrefix string) string { + return acctest.ConfigCompose( + testAccDeliveryStreamBaseConfig(rName), + fmt.Sprintf(` +resource "aws_kinesis_firehose_delivery_stream" "test" { + depends_on = [aws_iam_role_policy.firehose] + name = %[1]q + destination = "http_endpoint" + + s3_configuration { + role_arn = aws_iam_role.firehose.arn + bucket_arn = aws_s3_bucket.bucket.arn + error_output_prefix = %[2]q + } + + http_endpoint_configuration { + url = "https://input-test.com:443" + name = "HTTP_test" + role_arn = aws_iam_role.firehose.arn + } +} +`, rName, errorOutputPrefix)) +} + func testAccDeliveryStreamConfig_HTTPEndpoint_RetryDuration(rName string, retryDuration int) string { return acctest.ConfigCompose( testAccDeliveryStreamBaseConfig(rName), @@ -3219,6 +3494,32 @@ resource "aws_kinesis_firehose_delivery_stream" "test" { `, rName)) } +func testAccDeliveryStreamConfig_Elasticsearch_ErrorOutputPrefix(rName, errorOutputPrefix string) string { + return acctest.ConfigCompose( + testAccDeliveryStreamBaseElasticsearchConfig(rName), + fmt.Sprintf(` +resource "aws_kinesis_firehose_delivery_stream" "test" { + depends_on = [aws_iam_role_policy.firehose-elasticsearch] + + name = %[1]q + destination = "elasticsearch" + + s3_configuration { + role_arn = aws_iam_role.firehose.arn + bucket_arn = aws_s3_bucket.bucket.arn + error_output_prefix = %[2]q + } + + elasticsearch_configuration { + domain_arn = aws_elasticsearch_domain.test_cluster.arn + role_arn = aws_iam_role.firehose.arn + index_name = "test" + type_name = "test" + } +} +`, rName, errorOutputPrefix)) +} + func testAccDeliveryStreamConfig_ElasticsearchVPCBasic(rName string) string { return acctest.ConfigCompose( testAccDeliveryStreamBaseElasticsearchVPCConfig(rName), diff --git a/website/docs/r/kinesis_firehose_delivery_stream.html.markdown b/website/docs/r/kinesis_firehose_delivery_stream.html.markdown index 30af6175763..23c4c957499 100644 --- a/website/docs/r/kinesis_firehose_delivery_stream.html.markdown +++ b/website/docs/r/kinesis_firehose_delivery_stream.html.markdown @@ -402,16 +402,18 @@ The `server_side_encryption` object supports the following: * `key_type`- (Optional) Type of encryption key. Default is `AWS_OWNED_CMK`. Valid values are `AWS_OWNED_CMK` and `CUSTOMER_MANAGED_CMK` * `key_arn` - (Optional) Amazon Resource Name (ARN) of the encryption key. Required when `key_type` is `CUSTOMER_MANAGED_CMK`. -The (DEPRECATED) `s3_configuration` object supports the following: +The `s3_configuration` object supports the following: + +~> **NOTE:** This configuration block is deprecated for the `s3` destination. * `role_arn` - (Required) The ARN of the AWS credentials. * `bucket_arn` - (Required) The ARN of the S3 bucket * `prefix` - (Optional) The "YYYY/MM/DD/HH" time format prefix is automatically used for delivered S3 files. You can specify an extra prefix to be added in front of the time format prefix. Note that if the prefix ends with a slash, it appears as a folder in the S3 bucket -* `error_output_prefix` - (Optional) Prefix added to failed records before writing them to S3. This prefix appears immediately following the bucket name. * `buffer_size` - (Optional) Buffer incoming data to the specified size, in MBs, before delivering it to the destination. The default value is 5. We recommend setting SizeInMBs to a value greater than the amount of data you typically ingest into the delivery stream in 10 seconds. For example, if you typically ingest data at 1 MB/sec set SizeInMBs to be 10 MB or higher. * `buffer_interval` - (Optional) Buffer incoming data for the specified period of time, in seconds, before delivering it to the destination. The default value is 300. * `compression_format` - (Optional) The compression format. If no value is specified, the default is `UNCOMPRESSED`. Other supported values are `GZIP`, `ZIP`, `Snappy`, & `HADOOP_SNAPPY`. +* `error_output_prefix` - (Optional) Prefix added to failed records before writing them to S3. Not currently supported for `redshift` destination. This prefix appears immediately following the bucket name. For information about how to specify this prefix, see [Custom Prefixes for Amazon S3 Objects](https://docs.aws.amazon.com/firehose/latest/dev/s3-prefixes.html). * `kms_key_arn` - (Optional) Specifies the KMS key ARN the stream will use to encrypt data. If not set, no encryption will be used. * `cloudwatch_logging_options` - (Optional) The CloudWatch Logging Options for the delivery stream. More details are given below