Skip to content

Commit

Permalink
Merge pull request #27616 from kristofmartens/b_aws_appflow_flow_s3_o…
Browse files Browse the repository at this point in the history
…utput_add_preserve_source_data_typing

Add the option to preserve the source data type for S3 output format…
  • Loading branch information
johnsonaj authored Mar 10, 2023
2 parents 4f68756 + 07201e9 commit 9b0cd00
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .changelog/27616.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
resource/aws_appflow_flow: Add attribute `preserve_source_data_typing` to `s3_output_format_config` in `s3`
```
12 changes: 12 additions & 0 deletions internal/service/appflow/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,10 @@ func ResourceFlow() *schema.Resource {
},
},
},
"preserve_source_data_typing": {
Type: schema.TypeBool,
Optional: true,
},
},
},
},
Expand Down Expand Up @@ -1726,6 +1730,10 @@ func expandS3OutputFormatConfig(tfMap map[string]interface{}) *appflow.S3OutputF
a.PrefixConfig = expandPrefixConfig(v[0].(map[string]interface{}))
}

if v, ok := tfMap["preserve_source_data_typing"].(bool); ok {
a.PreserveSourceDataTyping = aws.Bool(v)
}

return a
}

Expand Down Expand Up @@ -2830,6 +2838,10 @@ func flattenS3OutputFormatConfig(s3OutputFormatConfig *appflow.S3OutputFormatCon
m["prefix_config"] = []interface{}{flattenPrefixConfig(v)}
}

if v := s3OutputFormatConfig.PreserveSourceDataTyping; v != nil {
m["preserve_source_data_typing"] = aws.BoolValue(v)
}

return m
}

Expand Down
117 changes: 117 additions & 0 deletions internal/service/appflow/flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,53 @@ func TestAccAppFlowFlow_basic(t *testing.T) {
})
}

func TestAccAppFlowFlow_S3_outputFormatConfig_ParquetFileType(t *testing.T) {
ctx := acctest.Context(t)
var flowOutput appflow.FlowDefinition
rSourceName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
rDestinationName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
rFlowName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
resourceName := "aws_appflow_flow.test"
scheduleStartTime := time.Now().UTC().AddDate(0, 0, 1).Format(time.RFC3339)

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acctest.PreCheck(t) },
ErrorCheck: acctest.ErrorCheck(t, appflow.EndpointsID),
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories,
CheckDestroy: testAccCheckFlowDestroy(ctx),
Steps: []resource.TestStep{
{
Config: testAccFlowConfig_S3_OutputFormatConfig_ParquetFileType(rSourceName, rDestinationName, rFlowName, scheduleStartTime, "PARQUET", true),
Check: resource.ComposeAggregateTestCheckFunc(
testAccCheckFlowExists(ctx, resourceName, &flowOutput),
resource.TestCheckResourceAttrSet(resourceName, "destination_flow_config.#"),
resource.TestCheckResourceAttrSet(resourceName, "destination_flow_config.0.connector_type"),
resource.TestCheckResourceAttrSet(resourceName, "destination_flow_config.0.destination_connector_properties.#"),
resource.TestCheckResourceAttr(resourceName, "destination_flow_config.0.destination_connector_properties.0.s3.0.s3_output_format_config.0.preserve_source_data_typing", "true"),
resource.TestCheckResourceAttr(resourceName, "destination_flow_config.0.destination_connector_properties.0.s3.0.s3_output_format_config.0.file_type", "PARQUET"),
resource.TestCheckResourceAttrSet(resourceName, "task.#"),
resource.TestCheckResourceAttrSet(resourceName, "task.0.source_fields.#"),
resource.TestCheckResourceAttrSet(resourceName, "task.0.task_type"),
),
},
{
Config: testAccFlowConfig_S3_OutputFormatConfig_ParquetFileType(rSourceName, rDestinationName, rFlowName, scheduleStartTime, "PARQUET", false),
Check: resource.ComposeAggregateTestCheckFunc(
testAccCheckFlowExists(ctx, resourceName, &flowOutput),
resource.TestCheckResourceAttrSet(resourceName, "destination_flow_config.#"),
resource.TestCheckResourceAttrSet(resourceName, "destination_flow_config.0.connector_type"),
resource.TestCheckResourceAttrSet(resourceName, "destination_flow_config.0.destination_connector_properties.#"),
resource.TestCheckResourceAttr(resourceName, "destination_flow_config.0.destination_connector_properties.0.s3.0.s3_output_format_config.0.preserve_source_data_typing", "false"),
resource.TestCheckResourceAttr(resourceName, "destination_flow_config.0.destination_connector_properties.0.s3.0.s3_output_format_config.0.file_type", "PARQUET"),
resource.TestCheckResourceAttrSet(resourceName, "task.#"),
resource.TestCheckResourceAttrSet(resourceName, "task.0.source_fields.#"),
resource.TestCheckResourceAttrSet(resourceName, "task.0.task_type"),
),
},
},
})
}

func TestAccAppFlowFlow_update(t *testing.T) {
ctx := acctest.Context(t)
var flowOutput appflow.FlowDefinition
Expand Down Expand Up @@ -344,6 +391,76 @@ resource "aws_appflow_flow" "test" {
)
}

func testAccFlowConfig_S3_OutputFormatConfig_ParquetFileType(rSourceName, rDestinationName, rFlowName, scheduleStartTime, fileType string, preserveSourceDataTyping bool) string {
return acctest.ConfigCompose(
testAccFlowConfig_base(rSourceName, rDestinationName),
fmt.Sprintf(`
resource "aws_appflow_flow" "test" {
name = %[1]q
source_flow_config {
connector_type = "S3"
source_connector_properties {
s3 {
bucket_name = aws_s3_bucket_policy.test_source.bucket
bucket_prefix = "flow"
}
}
}
destination_flow_config {
connector_type = "S3"
destination_connector_properties {
s3 {
bucket_name = aws_s3_bucket_policy.test_destination.bucket
s3_output_format_config {
prefix_config {
prefix_type = "PATH"
}
file_type = %[3]q
preserve_source_data_typing = %[4]t
aggregation_config {
aggregation_type = "None"
}
}
}
}
}
task {
source_fields = ["testField"]
destination_field = "testField"
task_type = "Map"
task_properties = {
"DESTINATION_DATA_TYPE" = "string"
"SOURCE_DATA_TYPE" = "string"
}
connector_operator {
s3 = "NO_OP"
}
}
trigger_config {
trigger_type = "Scheduled"
trigger_properties {
scheduled {
data_pull_mode = "Incremental"
schedule_expression = "rate(3hours)"
schedule_start_time = %[2]q
}
}
}
}
`, rFlowName, scheduleStartTime, fileType, preserveSourceDataTyping),
)
}

func testAccFlowConfig_update(rSourceName string, rDestinationName string, rFlowName string, description string) string {
return acctest.ConfigCompose(
testAccFlowConfig_base(rSourceName, rDestinationName),
Expand Down
1 change: 1 addition & 0 deletions website/docs/r/appflow_flow.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ EventBridge, Honeycode, and Marketo destination properties all support the follo
* `aggregation_config` - (Optional) Aggregation settings that you can use to customize the output format of your flow data. See [Aggregation Config](#aggregation-config) for more details.
* `file_type` - (Optional) File type that Amazon AppFlow places in the Amazon S3 bucket. Valid values are `CSV`, `JSON`, and `PARQUET`.
* `prefix_config` - (Optional) Determines the prefix that Amazon AppFlow applies to the folder name in the Amazon S3 bucket. You can name folders according to the flow frequency and date. See [Prefix Config](#prefix-config) for more details.
* `preserve_source_data_typing` - (Optional, Boolean) Whether the data types from the source system need to be preserved (Only valid for `Parquet` file type)

##### Salesforce Destination Properties

Expand Down

0 comments on commit 9b0cd00

Please sign in to comment.