Skip to content

Commit

Permalink
azurerm_data_factory_data_flow `azurerm_data_factory_flowlet_data_f…
Browse files Browse the repository at this point in the history
…low` - `flowlet` (#16987)
  • Loading branch information
shu-ying789 authored Aug 18, 2022
1 parent b645801 commit f4b8e64
Show file tree
Hide file tree
Showing 7 changed files with 1,228 additions and 9 deletions.
97 changes: 97 additions & 0 deletions internal/services/datafactory/data_factory_data_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,35 @@ func SchemaForDataFlowSourceAndSink() *pluginsdk.Schema {
},
},

"flowlet": {
Type: pluginsdk.TypeList,
Optional: true,
MaxItems: 1,
Elem: &pluginsdk.Resource{
Schema: map[string]*pluginsdk.Schema{
"name": {
Type: pluginsdk.TypeString,
Required: true,
ValidateFunc: validation.StringIsNotEmpty,
},

"parameters": {
Type: pluginsdk.TypeMap,
Optional: true,
Elem: &pluginsdk.Schema{
Type: pluginsdk.TypeString,
},
},

"dataset_parameters": {
Type: pluginsdk.TypeString,
Optional: true,
ValidateFunc: validation.StringIsNotEmpty,
},
},
},
},

"linked_service": {
Type: pluginsdk.TypeList,
Optional: true,
Expand Down Expand Up @@ -139,6 +168,35 @@ func SchemaForDataFlowSourceTransformation() *pluginsdk.Schema {
},
},

"flowlet": {
Type: pluginsdk.TypeList,
Optional: true,
MaxItems: 1,
Elem: &pluginsdk.Resource{
Schema: map[string]*pluginsdk.Schema{
"name": {
Type: pluginsdk.TypeString,
Required: true,
ValidateFunc: validation.StringIsNotEmpty,
},

"parameters": {
Type: pluginsdk.TypeMap,
Optional: true,
Elem: &pluginsdk.Schema{
Type: pluginsdk.TypeString,
},
},

"dataset_parameters": {
Type: pluginsdk.TypeString,
Optional: true,
ValidateFunc: validation.StringIsNotEmpty,
},
},
},
},

"linked_service": {
Type: pluginsdk.TypeList,
Optional: true,
Expand Down Expand Up @@ -180,6 +238,7 @@ func expandDataFactoryDataFlowSource(input []interface{}) *[]datafactory.DataFlo
Dataset: expandDataFactoryDatasetReference(raw["dataset"].([]interface{})),
LinkedService: expandDataFactoryLinkedServiceReference(raw["linked_service"].([]interface{})),
SchemaLinkedService: expandDataFactoryLinkedServiceReference(raw["schema_linked_service"].([]interface{})),
Flowlet: expandDataFactoryDataFlowReference(raw["flowlet"].([]interface{})),
})
}
return &result
Expand All @@ -199,6 +258,7 @@ func expandDataFactoryDataFlowSink(input []interface{}) *[]datafactory.DataFlowS
Dataset: expandDataFactoryDatasetReference(raw["dataset"].([]interface{})),
LinkedService: expandDataFactoryLinkedServiceReference(raw["linked_service"].([]interface{})),
SchemaLinkedService: expandDataFactoryLinkedServiceReference(raw["schema_linked_service"].([]interface{})),
Flowlet: expandDataFactoryDataFlowReference(raw["flowlet"].([]interface{})),
})
}
return &result
Expand All @@ -217,6 +277,7 @@ func expandDataFactoryDataFlowTransformation(input []interface{}) *[]datafactory
Name: utils.String(raw["name"].(string)),
Dataset: expandDataFactoryDatasetReference(raw["dataset"].([]interface{})),
LinkedService: expandDataFactoryLinkedServiceReference(raw["linked_service"].([]interface{})),
Flowlet: expandDataFactoryDataFlowReference(raw["flowlet"].([]interface{})),
})
}
return &result
Expand Down Expand Up @@ -248,6 +309,20 @@ func expandDataFactoryLinkedServiceReference(input []interface{}) *datafactory.L
}
}

func expandDataFactoryDataFlowReference(input []interface{}) *datafactory.DataFlowReference {
if len(input) == 0 || input[0] == nil {
return nil
}

raw := input[0].(map[string]interface{})
return &datafactory.DataFlowReference{
Type: utils.String("DataFlowReference"),
ReferenceName: utils.String(raw["name"].(string)),
Parameters: raw["parameters"].(map[string]interface{}),
DatasetParameters: utils.String(raw["dataset_parameters"].(string)),
}
}

func flattenDataFactoryDataFlowSource(input *[]datafactory.DataFlowSource) []interface{} {
if input == nil {
return []interface{}{}
Expand All @@ -269,6 +344,7 @@ func flattenDataFactoryDataFlowSource(input *[]datafactory.DataFlowSource) []int
"dataset": flattenDataFactoryDatasetReference(v.Dataset),
"linked_service": flattenDataFactoryLinkedServiceReference(v.LinkedService),
"schema_linked_service": flattenDataFactoryLinkedServiceReference(v.SchemaLinkedService),
"flowlet": flattenDataFactoryDataFlowReference(v.Flowlet),
})
}
return result
Expand All @@ -295,6 +371,7 @@ func flattenDataFactoryDataFlowSink(input *[]datafactory.DataFlowSink) []interfa
"dataset": flattenDataFactoryDatasetReference(v.Dataset),
"linked_service": flattenDataFactoryLinkedServiceReference(v.LinkedService),
"schema_linked_service": flattenDataFactoryLinkedServiceReference(v.SchemaLinkedService),
"flowlet": flattenDataFactoryDataFlowReference(v.Flowlet),
})
}
return result
Expand All @@ -320,6 +397,7 @@ func flattenDataFactoryDataFlowTransformation(input *[]datafactory.Transformatio
"description": description,
"dataset": flattenDataFactoryDatasetReference(v.Dataset),
"linked_service": flattenDataFactoryLinkedServiceReference(v.LinkedService),
"flowlet": flattenDataFactoryDataFlowReference(v.Flowlet),
})
}
return result
Expand Down Expand Up @@ -360,3 +438,22 @@ func flattenDataFactoryLinkedServiceReference(input *datafactory.LinkedServiceRe
},
}
}

func flattenDataFactoryDataFlowReference(input *datafactory.DataFlowReference) []interface{} {
if input == nil {
return []interface{}{}
}

name := ""
if input.ReferenceName != nil {
name = *input.ReferenceName
}

return []interface{}{
map[string]interface{}{
"name": name,
"parameters": input.Parameters,
"dataset_parameters": input.DatasetParameters,
},
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (r DataFlowResource) complete(data acceptance.TestData) string {
%s
resource "azurerm_data_factory_data_flow" "test" {
name = "acctestdf%d"
name = "acctestdf3%[2]d"
data_factory_id = azurerm_data_factory.test.id
description = "description for data flow"
annotations = ["anno1", "anno2"]
Expand All @@ -184,6 +184,13 @@ resource "azurerm_data_factory_data_flow" "test" {
name = "source1"
description = "description for source1"
flowlet {
name = azurerm_data_factory_flowlet_data_flow.test1.name
parameters = {
"Key1" = "value1"
}
}
linked_service {
name = azurerm_data_factory_linked_custom_service.test.name
parameters = {
Expand All @@ -203,6 +210,13 @@ resource "azurerm_data_factory_data_flow" "test" {
name = "sink1"
description = "description for sink1"
flowlet {
name = azurerm_data_factory_flowlet_data_flow.test2.name
parameters = {
"Key1" = "value1"
}
}
linked_service {
name = azurerm_data_factory_linked_custom_service.test.name
parameters = {
Expand Down Expand Up @@ -303,6 +317,60 @@ Filter1 sink(allowSchemaDrift: true,
partitionBy('roundRobin', 3)) ~> sink1
EOT
}
resource "azurerm_data_factory_flowlet_data_flow" "test1" {
name = "acctest1fdf%[2]d"
data_factory_id = azurerm_data_factory.test.id
source {
name = "source1"
}
sink {
name = "sink1"
}
script = <<EOT
source(
allowSchemaDrift: true,
validateSchema: false,
limit: 100,
ignoreNoFilesFound: false,
documentForm: 'documentPerLine') ~> source1
source1 sink(
allowSchemaDrift: true,
validateSchema: false,
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> sink1
EOT
}
resource "azurerm_data_factory_flowlet_data_flow" "test2" {
name = "acctest2fdf%[2]d"
data_factory_id = azurerm_data_factory.test.id
source {
name = "source1"
}
sink {
name = "sink1"
}
script = <<EOT
source(
allowSchemaDrift: true,
validateSchema: false,
limit: 100,
ignoreNoFilesFound: false,
documentForm: 'documentPerLine') ~> source1
source1 sink(
allowSchemaDrift: true,
validateSchema: false,
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> sink1
EOT
}
`, r.template(data), data.RandomInteger)
}

Expand All @@ -313,26 +381,26 @@ provider "azurerm" {
}
resource "azurerm_resource_group" "test" {
name = "acctestRG-df-%d"
location = "%s"
name = "acctestRG-df-%[1]d"
location = "%[2]s"
}
resource "azurerm_storage_account" "test" {
name = "acctestsa%s"
name = "acctestsa%[3]s"
location = azurerm_resource_group.test.location
resource_group_name = azurerm_resource_group.test.name
account_tier = "Standard"
account_replication_type = "LRS"
}
resource "azurerm_data_factory" "test" {
name = "acctestdf%d"
name = "acctestdf%[1]d"
location = azurerm_resource_group.test.location
resource_group_name = azurerm_resource_group.test.name
}
resource "azurerm_data_factory_linked_custom_service" "test" {
name = "acctestls%d"
name = "acctestls%[1]d"
data_factory_id = azurerm_data_factory.test.id
type = "AzureBlobStorage"
type_properties_json = <<JSON
Expand All @@ -343,7 +411,7 @@ JSON
}
resource "azurerm_data_factory_dataset_json" "test1" {
name = "acctestds1%d"
name = "acctestds1%[1]d"
data_factory_id = azurerm_data_factory.test.id
linked_service_name = azurerm_data_factory_linked_custom_service.test.name
Expand All @@ -357,7 +425,7 @@ resource "azurerm_data_factory_dataset_json" "test1" {
}
resource "azurerm_data_factory_dataset_json" "test2" {
name = "acctestds2%d"
name = "acctestds2%[1]d"
data_factory_id = azurerm_data_factory.test.id
linked_service_name = azurerm_data_factory_linked_custom_service.test.name
Expand All @@ -369,5 +437,5 @@ resource "azurerm_data_factory_dataset_json" "test2" {
encoding = "UTF-8"
}
`, data.RandomInteger, data.Locations.Primary, data.RandomString, data.RandomInteger, data.RandomInteger, data.RandomInteger, data.RandomInteger)
`, data.RandomInteger, data.Locations.Primary, data.RandomString)
}
Loading

0 comments on commit f4b8e64

Please sign in to comment.