Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for error_output_prefix to the aws_kinesis_firehose_delivery_stream s3 destination #11229

Merged
merged 4 commits into from
Dec 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .changelog/11229.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
```release-note:enhancement
resource/aws_kinesis_firehose_delivery_stream: Add `error_output_prefix` argument to `s3_configuration` configuration block
```

```release-note:enhancement
resource/aws_kinesis_firehose_delivery_stream: Add `error_output_prefix` argument to `redshift_configuration` `s3_backup_configuration` configuration block
```

```release-note:enhancement
resource/aws_kinesis_firehose_delivery_stream: Add `error_output_prefix` argument to `extended_s3_configuration` `s3_backup_configuration` configuration block
```
43 changes: 35 additions & 8 deletions internal/service/firehose/delivery_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@ func s3ConfigurationSchema() *schema.Schema {
ValidateFunc: validation.StringInSlice(firehose.CompressionFormat_Values(), false),
},

"error_output_prefix": {
Type: schema.TypeString,
Optional: true,
ValidateFunc: validation.StringLenBetween(0, 1024),
},

"kms_key_arn": {
Type: schema.TypeString,
Optional: true,
Expand Down Expand Up @@ -378,6 +384,7 @@ func flattenS3Configuration(description *firehose.S3DestinationDescription) []ma
"bucket_arn": aws.StringValue(description.BucketARN),
"cloudwatch_logging_options": flattenCloudwatchLoggingOptions(description.CloudWatchLoggingOptions),
"compression_format": aws.StringValue(description.CompressionFormat),
"error_output_prefix": aws.StringValue(description.ErrorOutputPrefix),
"prefix": aws.StringValue(description.Prefix),
"role_arn": aws.StringValue(description.RoleARN),
}
Expand Down Expand Up @@ -1225,8 +1232,9 @@ func ResourceDeliveryStream() *schema.Resource {
},

"error_output_prefix": {
Type: schema.TypeString,
Optional: true,
Type: schema.TypeString,
Optional: true,
ValidateFunc: validation.StringLenBetween(0, 1024),
},

"kms_key_arn": {
Expand Down Expand Up @@ -1617,6 +1625,10 @@ func createS3Config(d *schema.ResourceData) *firehose.S3DestinationConfiguration
EncryptionConfiguration: extractEncryptionConfiguration(s3),
}

if v, ok := s3["error_output_prefix"].(string); ok && v != "" {
configuration.ErrorOutputPrefix = aws.String(v)
}

if _, ok := s3["cloudwatch_logging_options"]; ok {
configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(s3)
}
Expand Down Expand Up @@ -1648,6 +1660,10 @@ func expandS3BackupConfig(d map[string]interface{}) *firehose.S3DestinationConfi
configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(s3)
}

if v, ok := s3["error_output_prefix"].(string); ok && v != "" {
configuration.ErrorOutputPrefix = aws.String(v)
}

return configuration
}

Expand Down Expand Up @@ -1679,8 +1695,8 @@ func createExtendedS3Config(d *schema.ResourceData) *firehose.ExtendedS3Destinat
configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(s3)
}

if v, ok := s3["error_output_prefix"]; ok && v.(string) != "" {
configuration.ErrorOutputPrefix = aws.String(v.(string))
if v, ok := s3["error_output_prefix"].(string); ok && v != "" {
configuration.ErrorOutputPrefix = aws.String(v)
}

if s3BackupMode, ok := s3["s3_backup_mode"]; ok {
Expand All @@ -1701,6 +1717,7 @@ func updateS3Config(d *schema.ResourceData) *firehose.S3DestinationUpdate {
IntervalInSeconds: aws.Int64((int64)(s3["buffer_interval"].(int))),
SizeInMBs: aws.Int64((int64)(s3["buffer_size"].(int))),
},
ErrorOutputPrefix: aws.String(s3["error_output_prefix"].(string)),
Prefix: extractPrefixConfiguration(s3),
CompressionFormat: aws.String(s3["compression_format"].(string)),
EncryptionConfiguration: extractEncryptionConfiguration(s3),
Expand Down Expand Up @@ -1729,6 +1746,7 @@ func updateS3BackupConfig(d map[string]interface{}) *firehose.S3DestinationUpdat
IntervalInSeconds: aws.Int64((int64)(s3["buffer_interval"].(int))),
SizeInMBs: aws.Int64((int64)(s3["buffer_size"].(int))),
},
ErrorOutputPrefix: aws.String(s3["error_output_prefix"].(string)),
Prefix: extractPrefixConfiguration(s3),
CompressionFormat: aws.String(s3["compression_format"].(string)),
EncryptionConfiguration: extractEncryptionConfiguration(s3),
Expand All @@ -1752,6 +1770,7 @@ func updateExtendedS3Config(d *schema.ResourceData) *firehose.ExtendedS3Destinat
IntervalInSeconds: aws.Int64((int64)(s3["buffer_interval"].(int))),
SizeInMBs: aws.Int64((int64)(s3["buffer_size"].(int))),
},
ErrorOutputPrefix: aws.String(s3["error_output_prefix"].(string)),
Prefix: extractPrefixConfiguration(s3),
CompressionFormat: aws.String(s3["compression_format"].(string)),
EncryptionConfiguration: extractEncryptionConfiguration(s3),
Expand All @@ -1768,10 +1787,6 @@ func updateExtendedS3Config(d *schema.ResourceData) *firehose.ExtendedS3Destinat
configuration.DynamicPartitioningConfiguration = extractDynamicPartitioningConfiguration(s3)
}

if v, ok := s3["error_output_prefix"]; ok && v.(string) != "" {
configuration.ErrorOutputPrefix = aws.String(v.(string))
}

if s3BackupMode, ok := s3["s3_backup_mode"]; ok {
configuration.S3BackupMode = aws.String(s3BackupMode.(string))
configuration.S3BackupUpdate = updateS3BackupConfig(d.Get("extended_s3_configuration").([]interface{})[0].(map[string]interface{}))
Expand Down Expand Up @@ -2162,6 +2177,12 @@ func updateRedshiftConfig(d *schema.ResourceData, s3Update *firehose.S3Destinati
if s3BackupMode, ok := redshift["s3_backup_mode"]; ok {
configuration.S3BackupMode = aws.String(s3BackupMode.(string))
configuration.S3BackupUpdate = updateS3BackupConfig(d.Get("redshift_configuration").([]interface{})[0].(map[string]interface{}))
if configuration.S3BackupUpdate != nil {
// Redshift does not currently support ErrorOutputPrefix,
// which is set to the empty string within "updateS3BackupConfig",
// thus we must remove it here to avoid an InvalidArgumentException.
configuration.S3BackupUpdate.ErrorOutputPrefix = nil
}
}

return configuration, nil
Expand Down Expand Up @@ -2722,6 +2743,12 @@ func resourceDeliveryStreamUpdate(d *schema.ResourceData, meta interface{}) erro
}
updateInput.ElasticsearchDestinationUpdate = esUpdate
} else if d.Get("destination").(string) == destinationTypeRedshift {
// Redshift does not currently support ErrorOutputPrefix,
// which is set to the empty string within "updateS3Config",
// thus we must remove it here to avoid an InvalidArgumentException.
if s3Config != nil {
s3Config.ErrorOutputPrefix = nil
}
rc, err := updateRedshiftConfig(d, s3Config)
if err != nil {
return err
Expand Down
Loading