Skip to content

Commit

Permalink
bigquery connection - spark connection type (#7498) (#16677)
Browse files Browse the repository at this point in the history
[upstream:2717aaf1a1716a081efbd63fd6d46fb8260e97d1]

Signed-off-by: Modular Magician <[email protected]>
  • Loading branch information
modular-magician authored Dec 5, 2023
1 parent 3031a15 commit 87d0dab
Show file tree
Hide file tree
Showing 4 changed files with 351 additions and 5 deletions.
3 changes: 3 additions & 0 deletions .changelog/7498.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
bigqueryconnection - add `spark` support
```
213 changes: 208 additions & 5 deletions google/services/bigqueryconnection/resource_bigquery_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func ResourceBigqueryConnectionConnection() *schema.Resource {
},
},
},
ExactlyOneOf: []string{"cloud_sql", "aws", "azure", "cloud_spanner", "cloud_resource"},
ExactlyOneOf: []string{"cloud_sql", "aws", "azure", "cloud_spanner", "cloud_resource", "spark"},
},
"azure": {
Type: schema.TypeList,
Expand Down Expand Up @@ -129,7 +129,7 @@ func ResourceBigqueryConnectionConnection() *schema.Resource {
},
},
},
ExactlyOneOf: []string{"cloud_sql", "aws", "azure", "cloud_spanner", "cloud_resource"},
ExactlyOneOf: []string{"cloud_sql", "aws", "azure", "cloud_spanner", "cloud_resource", "spark"},
},
"cloud_resource": {
Type: schema.TypeList,
Expand All @@ -145,7 +145,7 @@ func ResourceBigqueryConnectionConnection() *schema.Resource {
},
},
},
ExactlyOneOf: []string{"cloud_sql", "aws", "azure", "cloud_spanner", "cloud_resource"},
ExactlyOneOf: []string{"cloud_sql", "aws", "azure", "cloud_spanner", "cloud_resource", "spark"},
},
"cloud_spanner": {
Type: schema.TypeList,
Expand Down Expand Up @@ -190,7 +190,7 @@ func ResourceBigqueryConnectionConnection() *schema.Resource {
},
},
},
ExactlyOneOf: []string{"cloud_sql", "aws", "azure", "cloud_spanner", "cloud_resource"},
ExactlyOneOf: []string{"cloud_sql", "aws", "azure", "cloud_spanner", "cloud_resource", "spark"},
},
"cloud_sql": {
Type: schema.TypeList,
Expand Down Expand Up @@ -243,7 +243,7 @@ func ResourceBigqueryConnectionConnection() *schema.Resource {
},
},
},
ExactlyOneOf: []string{"cloud_sql", "aws", "azure", "cloud_spanner", "cloud_resource"},
ExactlyOneOf: []string{"cloud_sql", "aws", "azure", "cloud_spanner", "cloud_resource", "spark"},
},
"connection_id": {
Type: schema.TypeString,
Expand Down Expand Up @@ -274,6 +274,52 @@ Spanner Connections same as spanner region
AWS allowed regions are aws-us-east-1
Azure allowed regions are azure-eastus2`,
},
"spark": {
Type: schema.TypeList,
Optional: true,
Description: `Container for connection properties to execute stored procedures for Apache Spark. resources.`,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"metastore_service_config": {
Type: schema.TypeList,
Optional: true,
Description: `Dataproc Metastore Service configuration for the connection.`,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"metastore_service": {
Type: schema.TypeString,
Optional: true,
Description: `Resource name of an existing Dataproc Metastore service in the form of projects/[projectId]/locations/[region]/services/[serviceId].`,
},
},
},
},
"spark_history_server_config": {
Type: schema.TypeList,
Optional: true,
Description: `Spark History Server configuration for the connection.`,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"dataproc_cluster": {
Type: schema.TypeString,
Optional: true,
Description: `Resource name of an existing Dataproc Cluster to act as a Spark History Server for the connection if the form of projects/[projectId]/regions/[region]/clusters/[cluster_name].`,
},
},
},
},
"service_account_id": {
Type: schema.TypeString,
Computed: true,
Description: `The account ID of the service created for the purpose of this connection.`,
},
},
},
ExactlyOneOf: []string{"cloud_sql", "aws", "azure", "cloud_spanner", "cloud_resource", "spark"},
},
"has_credential": {
Type: schema.TypeBool,
Computed: true,
Expand Down Expand Up @@ -352,6 +398,12 @@ func resourceBigqueryConnectionConnectionCreate(d *schema.ResourceData, meta int
} else if v, ok := d.GetOkExists("cloud_resource"); ok || !reflect.DeepEqual(v, cloudResourceProp) {
obj["cloudResource"] = cloudResourceProp
}
sparkProp, err := expandBigqueryConnectionConnectionSpark(d.Get("spark"), d, config)
if err != nil {
return err
} else if v, ok := d.GetOkExists("spark"); ok || !reflect.DeepEqual(v, sparkProp) {
obj["spark"] = sparkProp
}

obj, err = resourceBigqueryConnectionConnectionEncoder(d, meta, obj)
if err != nil {
Expand Down Expand Up @@ -490,6 +542,9 @@ func resourceBigqueryConnectionConnectionRead(d *schema.ResourceData, meta inter
if err := d.Set("cloud_resource", flattenBigqueryConnectionConnectionCloudResource(res["cloudResource"], d, config)); err != nil {
return fmt.Errorf("Error reading Connection: %s", err)
}
if err := d.Set("spark", flattenBigqueryConnectionConnectionSpark(res["spark"], d, config)); err != nil {
return fmt.Errorf("Error reading Connection: %s", err)
}

return nil
}
Expand Down Expand Up @@ -552,6 +607,12 @@ func resourceBigqueryConnectionConnectionUpdate(d *schema.ResourceData, meta int
} else if v, ok := d.GetOkExists("cloud_resource"); ok || !reflect.DeepEqual(v, cloudResourceProp) {
obj["cloudResource"] = cloudResourceProp
}
sparkProp, err := expandBigqueryConnectionConnectionSpark(d.Get("spark"), d, config)
if err != nil {
return err
} else if v, ok := d.GetOkExists("spark"); ok || !reflect.DeepEqual(v, sparkProp) {
obj["spark"] = sparkProp
}

obj, err = resourceBigqueryConnectionConnectionEncoder(d, meta, obj)
if err != nil {
Expand Down Expand Up @@ -594,6 +655,10 @@ func resourceBigqueryConnectionConnectionUpdate(d *schema.ResourceData, meta int
if d.HasChange("cloud_resource") {
updateMask = append(updateMask, "cloudResource")
}

if d.HasChange("spark") {
updateMask = append(updateMask, "spark")
}
// updateMask is a URL parameter but not present in the schema, so ReplaceVars
// won't set it
url, err = transport_tpg.AddQueryParams(url, map[string]string{"updateMask": strings.Join(updateMask, ",")})
Expand Down Expand Up @@ -927,6 +992,61 @@ func flattenBigqueryConnectionConnectionCloudResourceServiceAccountId(v interfac
return v
}

func flattenBigqueryConnectionConnectionSpark(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
if v == nil {
return nil
}
original := v.(map[string]interface{})
if len(original) == 0 {
return nil
}
transformed := make(map[string]interface{})
transformed["service_account_id"] =
flattenBigqueryConnectionConnectionSparkServiceAccountId(original["serviceAccountId"], d, config)
transformed["metastore_service_config"] =
flattenBigqueryConnectionConnectionSparkMetastoreServiceConfig(original["metastoreServiceConfig"], d, config)
transformed["spark_history_server_config"] =
flattenBigqueryConnectionConnectionSparkSparkHistoryServerConfig(original["sparkHistoryServerConfig"], d, config)
return []interface{}{transformed}
}
func flattenBigqueryConnectionConnectionSparkServiceAccountId(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenBigqueryConnectionConnectionSparkMetastoreServiceConfig(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
if v == nil {
return nil
}
original := v.(map[string]interface{})
if len(original) == 0 {
return nil
}
transformed := make(map[string]interface{})
transformed["metastore_service"] =
flattenBigqueryConnectionConnectionSparkMetastoreServiceConfigMetastoreService(original["metastoreService"], d, config)
return []interface{}{transformed}
}
func flattenBigqueryConnectionConnectionSparkMetastoreServiceConfigMetastoreService(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenBigqueryConnectionConnectionSparkSparkHistoryServerConfig(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
if v == nil {
return nil
}
original := v.(map[string]interface{})
if len(original) == 0 {
return nil
}
transformed := make(map[string]interface{})
transformed["dataproc_cluster"] =
flattenBigqueryConnectionConnectionSparkSparkHistoryServerConfigDataprocCluster(original["dataprocCluster"], d, config)
return []interface{}{transformed}
}
func flattenBigqueryConnectionConnectionSparkSparkHistoryServerConfigDataprocCluster(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func expandBigqueryConnectionConnectionConnectionId(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}
Expand Down Expand Up @@ -1279,6 +1399,89 @@ func expandBigqueryConnectionConnectionCloudResourceServiceAccountId(v interface
return v, nil
}

func expandBigqueryConnectionConnectionSpark(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
l := v.([]interface{})
if len(l) == 0 || l[0] == nil {
return nil, nil
}
raw := l[0]
original := raw.(map[string]interface{})
transformed := make(map[string]interface{})

transformedServiceAccountId, err := expandBigqueryConnectionConnectionSparkServiceAccountId(original["service_account_id"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedServiceAccountId); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["serviceAccountId"] = transformedServiceAccountId
}

transformedMetastoreServiceConfig, err := expandBigqueryConnectionConnectionSparkMetastoreServiceConfig(original["metastore_service_config"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedMetastoreServiceConfig); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["metastoreServiceConfig"] = transformedMetastoreServiceConfig
}

transformedSparkHistoryServerConfig, err := expandBigqueryConnectionConnectionSparkSparkHistoryServerConfig(original["spark_history_server_config"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedSparkHistoryServerConfig); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["sparkHistoryServerConfig"] = transformedSparkHistoryServerConfig
}

return transformed, nil
}

func expandBigqueryConnectionConnectionSparkServiceAccountId(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandBigqueryConnectionConnectionSparkMetastoreServiceConfig(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
l := v.([]interface{})
if len(l) == 0 || l[0] == nil {
return nil, nil
}
raw := l[0]
original := raw.(map[string]interface{})
transformed := make(map[string]interface{})

transformedMetastoreService, err := expandBigqueryConnectionConnectionSparkMetastoreServiceConfigMetastoreService(original["metastore_service"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedMetastoreService); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["metastoreService"] = transformedMetastoreService
}

return transformed, nil
}

func expandBigqueryConnectionConnectionSparkMetastoreServiceConfigMetastoreService(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandBigqueryConnectionConnectionSparkSparkHistoryServerConfig(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
l := v.([]interface{})
if len(l) == 0 || l[0] == nil {
return nil, nil
}
raw := l[0]
original := raw.(map[string]interface{})
transformed := make(map[string]interface{})

transformedDataprocCluster, err := expandBigqueryConnectionConnectionSparkSparkHistoryServerConfigDataprocCluster(original["dataproc_cluster"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedDataprocCluster); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["dataprocCluster"] = transformedDataprocCluster
}

return transformed, nil
}

func expandBigqueryConnectionConnectionSparkSparkHistoryServerConfigDataprocCluster(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func resourceBigqueryConnectionConnectionEncoder(d *schema.ResourceData, meta interface{}, obj map[string]interface{}) (map[string]interface{}, error) {
// connection_id is needed to qualify the URL but cannot be sent in the body
delete(obj, "connection_id")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,69 @@ resource "google_bigquery_connection" "connection" {
`, context)
}

func TestAccBigqueryConnectionConnection_bigqueryConnectionSparkExample(t *testing.T) {
t.Parallel()

context := map[string]interface{}{
"random_suffix": acctest.RandString(t, 10),
}

acctest.VcrTest(t, resource.TestCase{
PreCheck: func() { acctest.AccTestPreCheck(t) },
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories(t),
CheckDestroy: testAccCheckBigqueryConnectionConnectionDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccBigqueryConnectionConnection_bigqueryConnectionSparkExample(context),
},
{
ResourceName: "google_bigquery_connection.connection",
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{"location"},
},
},
})
}

func testAccBigqueryConnectionConnection_bigqueryConnectionSparkExample(context map[string]interface{}) string {
return acctest.Nprintf(`
resource "google_bigquery_connection" "connection" {
connection_id = "tf-test-my-connection%{random_suffix}"
location = "US"
friendly_name = "👋"
description = "a riveting description"
spark {
spark_history_server_config {
dataproc_cluster = google_dataproc_cluster.basic.id
}
}
}
resource "google_dataproc_cluster" "basic" {
name = "tf-test-my-connection%{random_suffix}"
region = "us-central1"
cluster_config {
# Keep the costs down with smallest config we can get away with
software_config {
override_properties = {
"dataproc:dataproc.allow.zero.workers" = "true"
}
}
master_config {
num_instances = 1
machine_type = "e2-standard-2"
disk_config {
boot_disk_size_gb = 35
}
}
}
}
`, context)
}

func testAccCheckBigqueryConnectionConnectionDestroyProducer(t *testing.T) func(s *terraform.State) error {
return func(s *terraform.State) error {
for name, rs := range s.RootModule().Resources {
Expand Down
Loading

0 comments on commit 87d0dab

Please sign in to comment.