Skip to content

Commit

Permalink
Support for launching HDInsight cluster with Data Lake Gen2 Filesystem
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Intskirveli committed Oct 16, 2019
1 parent d8fefed commit 519873a
Show file tree
Hide file tree
Showing 10 changed files with 104 additions and 34 deletions.
88 changes: 72 additions & 16 deletions azurerm/helpers/azure/hdinsight.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,32 @@ func SchemaHDInsightsStorageAccounts() *schema.Schema {
Schema: map[string]*schema.Schema{
"storage_account_key": {
Type: schema.TypeString,
Required: true,
Optional: true,
ForceNew: true,
Sensitive: true,
ValidateFunc: validate.NoEmptyStrings,
},
"storage_container_id": {
Type: schema.TypeString,
Required: true,
Optional: true,
ForceNew: true,
ValidateFunc: validate.NoEmptyStrings,
},
"filesystem_id": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
ValidateFunc: validate.NoEmptyStrings,
},
"storage_resource_id": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
ValidateFunc: validate.NoEmptyStrings,
},
"managed_identity_resource_id": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
ValidateFunc: validate.NoEmptyStrings,
},
Expand All @@ -178,32 +196,70 @@ func SchemaHDInsightsStorageAccounts() *schema.Schema {
}
}

func ExpandHDInsightsStorageAccounts(input []interface{}) (*[]hdinsight.StorageAccount, error) {
// ExpandHDInsightsStorageAccounts returns an array of StorageAccount structs, as well as a ClusterIdentity
// populated with any managed identities required for accessing Data Lake Gen2 storage.
func ExpandHDInsightsStorageAccounts(input []interface{}) (*[]hdinsight.StorageAccount, *hdinsight.ClusterIdentity, error) {
results := make([]hdinsight.StorageAccount, 0)

var clusterIndentity *hdinsight.ClusterIdentity

for _, vs := range input {
v := vs.(map[string]interface{})

storageAccountKey := v["storage_account_key"].(string)
storageContainerId := v["storage_container_id"].(string)

storageContainerID := v["storage_container_id"].(string)

fileSystemID := v["filesystem_id"].(string)
storageResourceID := v["storage_resource_id"].(string)
managedIdentityResourceID := v["managed_identity_resource_id"].(string)

isDefault := v["is_default"].(bool)

// https://foo.blob.core.windows.net/example
uri, err := url.Parse(storageContainerId)
if err != nil {
return nil, fmt.Errorf("Error parsing %q: %s", storageContainerId, err)
}
if fileSystemID == "" && storageResourceID == "" && managedIdentityResourceID == "" && storageContainerID != "" && storageAccountKey != "" {
uri, err := url.Parse(storageContainerID)
if err != nil {
return nil, nil, fmt.Errorf("Error parsing %q: %s", storageContainerID, err)
}

result := hdinsight.StorageAccount{
Name: utils.String(uri.Host),
Container: utils.String(strings.TrimPrefix(uri.Path, "/")),
Key: utils.String(storageAccountKey),
IsDefault: utils.Bool(isDefault),
}
results = append(results, result)
} else if fileSystemID != "" && storageResourceID != "" && managedIdentityResourceID != "" && storageContainerID == "" && storageAccountKey == "" {
uri, err := url.Parse(fileSystemID)
if err != nil {
return nil, nil, fmt.Errorf("Error parsing %q: %s", storageContainerID, err)
}

if clusterIndentity == nil {
clusterIndentity = &hdinsight.ClusterIdentity{
Type: hdinsight.UserAssigned,
UserAssignedIdentities: make(map[string]*hdinsight.ClusterIdentityUserAssignedIdentitiesValue),
}
}

// ... API doesn't seem to require client_id or principal_id, so pass in an empty ClusterIdentityUserAssignedIdentitiesValue
clusterIndentity.UserAssignedIdentities[managedIdentityResourceID] = &hdinsight.ClusterIdentityUserAssignedIdentitiesValue{}

result := hdinsight.StorageAccount{
Name: utils.String(uri.Host),
Container: utils.String(strings.TrimPrefix(uri.Path, "/")),
Key: utils.String(storageAccountKey),
IsDefault: utils.Bool(isDefault),
result := hdinsight.StorageAccount{
Name: utils.String(uri.Host), // https://storageaccountname.dfs.core.windows.net/filesystemname -> storageaccountname.dfs.core.windows.net
ResourceID: utils.String(storageResourceID),
FileSystem: utils.String(uri.Path[1:]), // https://storageaccountname.dfs.core.windows.net/filesystemname -> filesystemname
MsiResourceID: utils.String(managedIdentityResourceID),
IsDefault: utils.Bool(isDefault),
}
results = append(results, result)
} else {
return nil, nil, fmt.Errorf(`specify either storage_container_id AND storage_account_key (for WASB blob storage), ` +
`or filesystem_id AND storage_resource_id AND managed_identity_resource_id (for ata Lake Storage Gen 2)`)
}
results = append(results, result)
}

return &results, nil
return &results, clusterIndentity, nil
}

type HDInsightNodeDefinition struct {
Expand Down
5 changes: 3 additions & 2 deletions azurerm/resource_arm_hdinsight_hadoop_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func resourceArmHDInsightHadoopClusterCreate(d *schema.ResourceData, meta interf
gateway := azure.ExpandHDInsightsConfigurations(gatewayRaw)

storageAccountsRaw := d.Get("storage_account").([]interface{})
storageAccounts, err := azure.ExpandHDInsightsStorageAccounts(storageAccountsRaw)
storageAccounts, identity, err := azure.ExpandHDInsightsStorageAccounts(storageAccountsRaw)
if err != nil {
return fmt.Errorf("Error expanding `storage_account`: %s", err)
}
Expand Down Expand Up @@ -185,7 +185,8 @@ func resourceArmHDInsightHadoopClusterCreate(d *schema.ResourceData, meta interf
Roles: roles,
},
},
Tags: tags.Expand(t),
Tags: tags.Expand(t),
Identity: identity,
}
future, err := client.Create(ctx, resourceGroup, name, params)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions azurerm/resource_arm_hdinsight_hbase_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func resourceArmHDInsightHBaseClusterCreate(d *schema.ResourceData, meta interfa
gateway := azure.ExpandHDInsightsConfigurations(gatewayRaw)

storageAccountsRaw := d.Get("storage_account").([]interface{})
storageAccounts, err := azure.ExpandHDInsightsStorageAccounts(storageAccountsRaw)
storageAccounts, identity, err := azure.ExpandHDInsightsStorageAccounts(storageAccountsRaw)
if err != nil {
return fmt.Errorf("Error expanding `storage_account`: %s", err)
}
Expand Down Expand Up @@ -183,7 +183,8 @@ func resourceArmHDInsightHBaseClusterCreate(d *schema.ResourceData, meta interfa
Roles: roles,
},
},
Tags: tags.Expand(t),
Tags: tags.Expand(t),
Identity: identity,
}
future, err := client.Create(ctx, resourceGroup, name, params)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions azurerm/resource_arm_hdinsight_interactive_query_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func resourceArmHDInsightInteractiveQueryClusterCreate(d *schema.ResourceData, m
gateway := azure.ExpandHDInsightsConfigurations(gatewayRaw)

storageAccountsRaw := d.Get("storage_account").([]interface{})
storageAccounts, err := azure.ExpandHDInsightsStorageAccounts(storageAccountsRaw)
storageAccounts, identity, err := azure.ExpandHDInsightsStorageAccounts(storageAccountsRaw)
if err != nil {
return fmt.Errorf("Error expanding `storage_account`: %s", err)
}
Expand Down Expand Up @@ -183,7 +183,8 @@ func resourceArmHDInsightInteractiveQueryClusterCreate(d *schema.ResourceData, m
Roles: roles,
},
},
Tags: tags.Expand(t),
Tags: tags.Expand(t),
Identity: identity,
}
future, err := client.Create(ctx, resourceGroup, name, params)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions azurerm/resource_arm_hdinsight_kafka_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func resourceArmHDInsightKafkaClusterCreate(d *schema.ResourceData, meta interfa
gateway := azure.ExpandHDInsightsConfigurations(gatewayRaw)

storageAccountsRaw := d.Get("storage_account").([]interface{})
storageAccounts, err := azure.ExpandHDInsightsStorageAccounts(storageAccountsRaw)
storageAccounts, identity, err := azure.ExpandHDInsightsStorageAccounts(storageAccountsRaw)
if err != nil {
return fmt.Errorf("Error expanding `storage_account`: %s", err)
}
Expand Down Expand Up @@ -184,7 +184,8 @@ func resourceArmHDInsightKafkaClusterCreate(d *schema.ResourceData, meta interfa
Roles: roles,
},
},
Tags: tags.Expand(t),
Tags: tags.Expand(t),
Identity: identity,
}
future, err := client.Create(ctx, resourceGroup, name, params)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions azurerm/resource_arm_hdinsight_ml_services_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func resourceArmHDInsightMLServicesClusterCreate(d *schema.ResourceData, meta in
gateway := expandHDInsightsMLServicesConfigurations(gatewayRaw, rStudio)

storageAccountsRaw := d.Get("storage_account").([]interface{})
storageAccounts, err := azure.ExpandHDInsightsStorageAccounts(storageAccountsRaw)
storageAccounts, identity, err := azure.ExpandHDInsightsStorageAccounts(storageAccountsRaw)
if err != nil {
return fmt.Errorf("Error expanding `storage_account`: %s", err)
}
Expand Down Expand Up @@ -200,7 +200,8 @@ func resourceArmHDInsightMLServicesClusterCreate(d *schema.ResourceData, meta in
Roles: roles,
},
},
Tags: tags.Expand(t),
Tags: tags.Expand(t),
Identity: identity,
}
future, err := client.Create(ctx, resourceGroup, name, params)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions azurerm/resource_arm_hdinsight_rserver_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func resourceArmHDInsightRServerClusterCreate(d *schema.ResourceData, meta inter
gateway := expandHDInsightsRServerConfigurations(gatewayRaw, rStudio)

storageAccountsRaw := d.Get("storage_account").([]interface{})
storageAccounts, err := azure.ExpandHDInsightsStorageAccounts(storageAccountsRaw)
storageAccounts, identity, err := azure.ExpandHDInsightsStorageAccounts(storageAccountsRaw)
if err != nil {
return fmt.Errorf("Error expanding `storage_account`: %s", err)
}
Expand Down Expand Up @@ -200,7 +200,8 @@ func resourceArmHDInsightRServerClusterCreate(d *schema.ResourceData, meta inter
Roles: roles,
},
},
Tags: tags.Expand(t),
Tags: tags.Expand(t),
Identity: identity,
}
future, err := client.Create(ctx, resourceGroup, name, params)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions azurerm/resource_arm_hdinsight_spark_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func resourceArmHDInsightSparkClusterCreate(d *schema.ResourceData, meta interfa
gateway := azure.ExpandHDInsightsConfigurations(gatewayRaw)

storageAccountsRaw := d.Get("storage_account").([]interface{})
storageAccounts, err := azure.ExpandHDInsightsStorageAccounts(storageAccountsRaw)
storageAccounts, identity, err := azure.ExpandHDInsightsStorageAccounts(storageAccountsRaw)
if err != nil {
return fmt.Errorf("Error expanding `storage_account`: %s", err)
}
Expand Down Expand Up @@ -183,7 +183,8 @@ func resourceArmHDInsightSparkClusterCreate(d *schema.ResourceData, meta interfa
Roles: roles,
},
},
Tags: tags.Expand(t),
Tags: tags.Expand(t),
Identity: identity,
}
future, err := client.Create(ctx, resourceGroup, name, params)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions azurerm/resource_arm_hdinsight_storm_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func resourceArmHDInsightStormClusterCreate(d *schema.ResourceData, meta interfa
gateway := azure.ExpandHDInsightsConfigurations(gatewayRaw)

storageAccountsRaw := d.Get("storage_account").([]interface{})
storageAccounts, err := azure.ExpandHDInsightsStorageAccounts(storageAccountsRaw)
storageAccounts, identity, err := azure.ExpandHDInsightsStorageAccounts(storageAccountsRaw)
if err != nil {
return fmt.Errorf("Error expanding `storage_account`: %s", err)
}
Expand Down Expand Up @@ -184,7 +184,8 @@ func resourceArmHDInsightStormClusterCreate(d *schema.ResourceData, meta interfa
Roles: roles,
},
},
Tags: tags.Expand(t),
Tags: tags.Expand(t),
Identity: identity,
}
future, err := client.Create(ctx, resourceGroup, name, params)
if err != nil {
Expand Down
10 changes: 8 additions & 2 deletions website/docs/r/hdinsight_hadoop_cluster.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,18 @@ A `storage_account` block supports the following:

-> **NOTE:** One of the `storage_account` blocks must be marked as the default.

* `storage_account_key` - (Required) The Access Key which should be used to connect to the Storage Account. Changing this forces a new resource to be created.
* `storage_account_key` - (Required for Blob storage) The Access Key which should be used to connect to the Storage Account. Changing this forces a new resource to be created.

* `storage_container_id` - (Required) The ID of the Storage Container. Changing this forces a new resource to be created.
* `storage_container_id` - (Required for Blob storage) The ID of the Storage Container. Changing this forces a new resource to be created.

-> **NOTE:** This can be obtained from the `id` of the `azurerm_storage_container` resource.

* `storage_resource_id` - (Required for Gen2 storage) The resource ID of the Storage Account. Changing this forces a new resource to be created.

* `filesystem_id` - (Required for Gen2 storage) The ID of the Gen2 filesystem. See `azurerm_storage_data_lake_gen2_filesystem`. Changing this forces a new resource to be created.

* `managed_identity_resource_id` - (Required for Gen2 storage) The ID managed identity for access to the Gen2 filesystem. Changing this forces a new resource to be created.

---

A `worker_node` block supports the following:
Expand Down

0 comments on commit 519873a

Please sign in to comment.