Skip to content

Commit

Permalink
Add S3 Connection Attributes to DMS Endpoint
Browse files Browse the repository at this point in the history
Adds a few commonly used parameters to the explicit S3 connection block.

Please note that IMO, we should not default those settings because they
are defaulted on AWS side. However, the API is a bit inconsistent in
which properties the API returns on GET if the resource was not
_created_ with particular resources (i.e.
`parquet_timestamp_in_millisecond` is always returned, `data_format` is
not if it was not explicitly used for creation, yet it defaults to
`csv`.

In order to make tests and usage of the provider a bit more sane, I
defaulted all properties.
  • Loading branch information
sbrandtb committed Jun 18, 2021
1 parent 9e308a2 commit ae101e6
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 16 deletions.
3 changes: 3 additions & 0 deletions .changelog/17591.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
resource/aws_dms_endpoint: Add `s3_settings.data_format`, `s3_settings.parquet_timestamp_in_millisecond`, `s3_settings.parquet_version`, `s3_settings.encryption_mode` and `s3_settings.server_side_encryption_kms_key_id` arguments.
```
76 changes: 60 additions & 16 deletions aws/resource_aws_dms_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,33 @@ func resourceAwsDmsEndpoint() *schema.Resource {
Optional: true,
Default: false,
},
"data_format": {
Type: schema.TypeString,
Optional: true,
Default: dms.DataFormatValueCsv,
ValidateFunc: validation.StringInSlice(dms.DataFormatValue_Values(), false),
},
"parquet_timestamp_in_millisecond": {
Type: schema.TypeBool,
Optional: true,
Default: false,
},
"parquet_version": {
Type: schema.TypeString,
Optional: true,
Default: dms.ParquetVersionValueParquet10,
ValidateFunc: validation.StringInSlice(dms.ParquetVersionValue_Values(), false),
},
"encryption_mode": {
Type: schema.TypeString,
Optional: true,
Default: "SSE_S3",
ValidateFunc: validation.StringInSlice([]string{"SSE_S3", "SSE_KMS"}, false),
},
"server_side_encryption_kms_key_id": {
Type: schema.TypeString,
Optional: true,
},
},
},
},
Expand Down Expand Up @@ -406,8 +433,13 @@ func resourceAwsDmsEndpointCreate(d *schema.ResourceData, meta interface{}) erro
CompressionType: aws.String(d.Get("s3_settings.0.compression_type").(string)),
CsvDelimiter: aws.String(d.Get("s3_settings.0.csv_delimiter").(string)),
CsvRowDelimiter: aws.String(d.Get("s3_settings.0.csv_row_delimiter").(string)),
DataFormat: aws.String(d.Get("s3_settings.0.data_format").(string)),
DatePartitionEnabled: aws.Bool(d.Get("s3_settings.0.date_partition_enabled").(bool)),
EncryptionMode: aws.String(d.Get("s3_settings.0.encryption_mode").(string)),
ExternalTableDefinition: aws.String(d.Get("s3_settings.0.external_table_definition").(string)),
ParquetTimestampInMillisecond: aws.Bool(d.Get("s3_settings.0.parquet_timestamp_in_millisecond").(bool)),
ParquetVersion: aws.String(d.Get("s3_settings.0.parquet_version").(string)),
ServerSideEncryptionKmsKeyId: aws.String(d.Get("s3_settings.0.server_side_encryption_kms_key_id").(string)),
ServiceAccessRoleArn: aws.String(d.Get("s3_settings.0.service_access_role_arn").(string)),
}
default:
Expand Down Expand Up @@ -648,15 +680,22 @@ func resourceAwsDmsEndpointUpdate(d *schema.ResourceData, meta interface{}) erro
if d.HasChanges(
"s3_settings.0.service_access_role_arn", "s3_settings.0.external_table_definition",
"s3_settings.0.csv_row_delimiter", "s3_settings.0.csv_delimiter", "s3_settings.0.bucket_folder",
"s3_settings.0.bucket_name", "s3_settings.0.compression_type") {
"s3_settings.0.bucket_name", "s3_settings.0.compression_type", "s3_settings.0.data_format",
"s3_settings.0.parquet_version", "s3_settings.0.parquet_timestamp_in_millisecond",
"s3_settings.0.encryption_mode", "s3_settings.0.server_side_encryption_kms_key_id") {
request.S3Settings = &dms.S3Settings{
BucketFolder: aws.String(d.Get("s3_settings.0.bucket_folder").(string)),
BucketName: aws.String(d.Get("s3_settings.0.bucket_name").(string)),
CompressionType: aws.String(d.Get("s3_settings.0.compression_type").(string)),
CsvDelimiter: aws.String(d.Get("s3_settings.0.csv_delimiter").(string)),
CsvRowDelimiter: aws.String(d.Get("s3_settings.0.csv_row_delimiter").(string)),
ExternalTableDefinition: aws.String(d.Get("s3_settings.0.external_table_definition").(string)),
ServiceAccessRoleArn: aws.String(d.Get("s3_settings.0.service_access_role_arn").(string)),
BucketFolder: aws.String(d.Get("s3_settings.0.bucket_folder").(string)),
BucketName: aws.String(d.Get("s3_settings.0.bucket_name").(string)),
CompressionType: aws.String(d.Get("s3_settings.0.compression_type").(string)),
CsvDelimiter: aws.String(d.Get("s3_settings.0.csv_delimiter").(string)),
CsvRowDelimiter: aws.String(d.Get("s3_settings.0.csv_row_delimiter").(string)),
DataFormat: aws.String(d.Get("s3_settings.0.data_format").(string)),
EncryptionMode: aws.String(d.Get("s3_settings.0.encryption_mode").(string)),
ExternalTableDefinition: aws.String(d.Get("s3_settings.0.external_table_definition").(string)),
ParquetTimestampInMillisecond: aws.Bool(d.Get("s3_settings.0.parquet_timestamp_in_millisecond").(bool)),
ParquetVersion: aws.String(d.Get("s3_settings.0.parquet_version").(string)),
ServerSideEncryptionKmsKeyId: aws.String(d.Get("s3_settings.0.server_side_encryption_kms_key_id").(string)),
ServiceAccessRoleArn: aws.String(d.Get("s3_settings.0.service_access_role_arn").(string)),
}
request.EngineName = aws.String(d.Get("engine_name").(string)) // Must be included (should be 's3')
hasChanges = true
Expand Down Expand Up @@ -843,14 +882,19 @@ func flattenDmsS3Settings(settings *dms.S3Settings) []map[string]interface{} {
}

m := map[string]interface{}{
"bucket_folder": aws.StringValue(settings.BucketFolder),
"bucket_name": aws.StringValue(settings.BucketName),
"compression_type": aws.StringValue(settings.CompressionType),
"csv_delimiter": aws.StringValue(settings.CsvDelimiter),
"csv_row_delimiter": aws.StringValue(settings.CsvRowDelimiter),
"date_partition_enabled": aws.BoolValue(settings.DatePartitionEnabled),
"external_table_definition": aws.StringValue(settings.ExternalTableDefinition),
"service_access_role_arn": aws.StringValue(settings.ServiceAccessRoleArn),
"bucket_folder": aws.StringValue(settings.BucketFolder),
"bucket_name": aws.StringValue(settings.BucketName),
"compression_type": aws.StringValue(settings.CompressionType),
"csv_delimiter": aws.StringValue(settings.CsvDelimiter),
"csv_row_delimiter": aws.StringValue(settings.CsvRowDelimiter),
"data_format": aws.StringValue(settings.DataFormat),
"date_partition_enabled": aws.BoolValue(settings.DatePartitionEnabled),
"encryption_mode": aws.StringValue(settings.EncryptionMode),
"external_table_definition": aws.StringValue(settings.ExternalTableDefinition),
"parquet_timestamp_in_millisecond": aws.BoolValue(settings.ParquetTimestampInMillisecond),
"parquet_version": aws.StringValue(settings.ParquetVersion),
"server_side_encryption_kms_key_id": aws.StringValue(settings.ServerSideEncryptionKmsKeyId),
"service_access_role_arn": aws.StringValue(settings.ServiceAccessRoleArn),
}

return []map[string]interface{}{m}
Expand Down
5 changes: 5 additions & 0 deletions aws/resource_aws_dms_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ func TestAccAwsDmsEndpoint_S3(t *testing.T) {
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.bucket_folder", ""),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.bucket_name", "bucket_name"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.compression_type", "NONE"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.data_format", "csv"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.parquet_version", "parquet-1-0"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.parquet_timestamp_in_millisecond", "false"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.encryption_mode", "SSE_S3"),
resource.TestCheckResourceAttr(resourceName, "s3_settings.0.server_side_encryption_kms_key_id", ""),
),
},
{
Expand Down
5 changes: 5 additions & 0 deletions website/docs/r/dms_endpoint.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,13 @@ The `s3_settings` configuration block supports the following arguments:
* `compression_type` - (Optional) Set to compress target files. Defaults to `NONE`. Valid values are `GZIP` and `NONE`.
* `csv_delimiter` - (Optional) Delimiter used to separate columns in the source files. Defaults to `,`.
* `csv_row_delimiter` - (Optional) Delimiter used to separate rows in the source files. Defaults to `\n`.
* `data_format` - (Optional) The output format for the files that AWS DMS uses to create S3 objects. Defaults to `csv`. Valid values are `csv` and `parquet`.
* `date_partition_enabled` - (Optional) Partition S3 bucket folders based on transaction commit dates. Defaults to `false`.
* `encryption_mode` - (Optional) The server-side encryption mode that you want to encrypt your .csv or .parquet object files copied to S3. Defaults to `SSE_S3`. Valid values are `SSE_S3` and `SSE_KMS`.
* `external_table_definition` - (Optional) JSON document that describes how AWS DMS should interpret the data.
* `parquet_timestamp_in_millisecond` - (Optional) - Specifies the precision of any TIMESTAMP column values written to an S3 object file in .parquet format. Defaults to `false`.
* `parquet_version` - (Optional) The version of the .parquet file format. Defaults to `parquet-1-0`. Valid values are `parquet-1-0` and `parquet-2-0`.
* `server_side_encryption_kms_key_id` - (Optional) If you set encryptionMode to `SSE_KMS`, set this parameter to the Amazon Resource Name (ARN) for the AWS KMS key.
* `service_access_role_arn` - (Optional) Amazon Resource Name (ARN) of the IAM Role with permissions to read from or write to the S3 Bucket.

## Attributes Reference
Expand Down

0 comments on commit ae101e6

Please sign in to comment.