Skip to content

Commit

Permalink
Add support for Stored Procedure for Apache Spark (#9793) (#17028)
Browse files Browse the repository at this point in the history
* impl for #16953

* modified format in example

* modified tab to space

* added test for update and test for coverage

* mofify all resource in the spark option section

* modified connection id to avoid conflict
[upstream:5f50107bb30977b84230a3c526f2f9b404a40684]

Signed-off-by: Modular Magician <[email protected]>
  • Loading branch information
modular-magician authored Jan 17, 2024
1 parent 0d60922 commit 1510da6
Show file tree
Hide file tree
Showing 5 changed files with 756 additions and 3 deletions.
3 changes: 3 additions & 0 deletions .changelog/9793.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
bigquery: add `spark_options` field to `google_bigquery_routine` resource
```
303 changes: 301 additions & 2 deletions google/services/bigquery/resource_bigquery_routine.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ imported JAVASCRIPT libraries.`,
"language": {
Type: schema.TypeString,
Optional: true,
ValidateFunc: verify.ValidateEnum([]string{"SQL", "JAVASCRIPT", ""}),
Description: `The language of the routine. Possible values: ["SQL", "JAVASCRIPT"]`,
ValidateFunc: verify.ValidateEnum([]string{"SQL", "JAVASCRIPT", "PYTHON", "JAVA", "SCALA", ""}),
Description: `The language of the routine. Possible values: ["SQL", "JAVASCRIPT", "PYTHON", "JAVA", "SCALA"]`,
},
"return_table_type": {
Type: schema.TypeString,
Expand Down Expand Up @@ -176,6 +176,90 @@ d the order of values or replaced STRUCT field type with RECORD field type, we c
cannot suppress the recurring diff this causes. As a workaround, we recommend using
the schema as returned by the API.`,
},
"spark_options": {
Type: schema.TypeList,
Optional: true,
Description: `Optional. If language is one of "PYTHON", "JAVA", "SCALA", this field stores the options for spark stored procedure.`,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"archive_uris": {
Type: schema.TypeList,
Computed: true,
Optional: true,
Description: `Archive files to be extracted into the working directory of each executor. For more information about Apache Spark, see Apache Spark.`,
Elem: &schema.Schema{
Type: schema.TypeString,
},
},
"connection": {
Type: schema.TypeString,
Optional: true,
Description: `Fully qualified name of the user-provided Spark connection object.
Format: "projects/{projectId}/locations/{locationId}/connections/{connectionId}"`,
},
"container_image": {
Type: schema.TypeString,
Optional: true,
Description: `Custom container image for the runtime environment.`,
},
"file_uris": {
Type: schema.TypeList,
Computed: true,
Optional: true,
Description: `Files to be placed in the working directory of each executor. For more information about Apache Spark, see Apache Spark.`,
Elem: &schema.Schema{
Type: schema.TypeString,
},
},
"jar_uris": {
Type: schema.TypeList,
Computed: true,
Optional: true,
Description: `JARs to include on the driver and executor CLASSPATH. For more information about Apache Spark, see Apache Spark.`,
Elem: &schema.Schema{
Type: schema.TypeString,
},
},
"main_class": {
Type: schema.TypeString,
Optional: true,
Description: `The fully qualified name of a class in jarUris, for example, com.example.wordcount.
Exactly one of mainClass and main_jar_uri field should be set for Java/Scala language type.`,
},
"main_file_uri": {
Type: schema.TypeString,
Optional: true,
Description: `The main file/jar URI of the Spark application.
Exactly one of the definitionBody field and the mainFileUri field must be set for Python.
Exactly one of mainClass and mainFileUri field should be set for Java/Scala language type.`,
},
"properties": {
Type: schema.TypeMap,
Computed: true,
Optional: true,
Description: `Configuration properties as a set of key/value pairs, which will be passed on to the Spark application.
For more information, see Apache Spark and the procedure option list.
An object containing a list of "key": value pairs. Example: { "name": "wrench", "mass": "1.3kg", "count": "3" }.`,
Elem: &schema.Schema{Type: schema.TypeString},
},
"py_file_uris": {
Type: schema.TypeList,
Computed: true,
Optional: true,
Description: `Python files to be placed on the PYTHONPATH for PySpark application. Supported file types: .py, .egg, and .zip. For more information about Apache Spark, see Apache Spark.`,
Elem: &schema.Schema{
Type: schema.TypeString,
},
},
"runtime_version": {
Type: schema.TypeString,
Optional: true,
Description: `Runtime version. If not specified, the default runtime version is used.`,
},
},
},
},
"creation_time": {
Type: schema.TypeInt,
Computed: true,
Expand Down Expand Up @@ -267,6 +351,12 @@ func resourceBigQueryRoutineCreate(d *schema.ResourceData, meta interface{}) err
} else if v, ok := d.GetOkExists("determinism_level"); !tpgresource.IsEmptyValue(reflect.ValueOf(determinismLevelProp)) && (ok || !reflect.DeepEqual(v, determinismLevelProp)) {
obj["determinismLevel"] = determinismLevelProp
}
sparkOptionsProp, err := expandBigQueryRoutineSparkOptions(d.Get("spark_options"), d, config)
if err != nil {
return err
} else if v, ok := d.GetOkExists("spark_options"); !tpgresource.IsEmptyValue(reflect.ValueOf(sparkOptionsProp)) && (ok || !reflect.DeepEqual(v, sparkOptionsProp)) {
obj["sparkOptions"] = sparkOptionsProp
}

url, err := tpgresource.ReplaceVars(d, config, "{{BigQueryBasePath}}projects/{{project}}/datasets/{{dataset_id}}/routines")
if err != nil {
Expand Down Expand Up @@ -400,6 +490,9 @@ func resourceBigQueryRoutineRead(d *schema.ResourceData, meta interface{}) error
if err := d.Set("determinism_level", flattenBigQueryRoutineDeterminismLevel(res["determinismLevel"], d, config)); err != nil {
return fmt.Errorf("Error reading Routine: %s", err)
}
if err := d.Set("spark_options", flattenBigQueryRoutineSparkOptions(res["sparkOptions"], d, config)); err != nil {
return fmt.Errorf("Error reading Routine: %s", err)
}

return nil
}
Expand Down Expand Up @@ -480,6 +573,12 @@ func resourceBigQueryRoutineUpdate(d *schema.ResourceData, meta interface{}) err
} else if v, ok := d.GetOkExists("determinism_level"); !tpgresource.IsEmptyValue(reflect.ValueOf(v)) && (ok || !reflect.DeepEqual(v, determinismLevelProp)) {
obj["determinismLevel"] = determinismLevelProp
}
sparkOptionsProp, err := expandBigQueryRoutineSparkOptions(d.Get("spark_options"), d, config)
if err != nil {
return err
} else if v, ok := d.GetOkExists("spark_options"); !tpgresource.IsEmptyValue(reflect.ValueOf(v)) && (ok || !reflect.DeepEqual(v, sparkOptionsProp)) {
obj["sparkOptions"] = sparkOptionsProp
}

url, err := tpgresource.ReplaceVars(d, config, "{{BigQueryBasePath}}projects/{{project}}/datasets/{{dataset_id}}/routines/{{routine_id}}")
if err != nil {
Expand Down Expand Up @@ -727,6 +826,77 @@ func flattenBigQueryRoutineDeterminismLevel(v interface{}, d *schema.ResourceDat
return v
}

func flattenBigQueryRoutineSparkOptions(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
if v == nil {
return nil
}
original := v.(map[string]interface{})
if len(original) == 0 {
return nil
}
transformed := make(map[string]interface{})
transformed["connection"] =
flattenBigQueryRoutineSparkOptionsConnection(original["connection"], d, config)
transformed["runtime_version"] =
flattenBigQueryRoutineSparkOptionsRuntimeVersion(original["runtimeVersion"], d, config)
transformed["container_image"] =
flattenBigQueryRoutineSparkOptionsContainerImage(original["containerImage"], d, config)
transformed["properties"] =
flattenBigQueryRoutineSparkOptionsProperties(original["properties"], d, config)
transformed["main_file_uri"] =
flattenBigQueryRoutineSparkOptionsMainFileUri(original["mainFileUri"], d, config)
transformed["py_file_uris"] =
flattenBigQueryRoutineSparkOptionsPyFileUris(original["pyFileUris"], d, config)
transformed["jar_uris"] =
flattenBigQueryRoutineSparkOptionsJarUris(original["jarUris"], d, config)
transformed["file_uris"] =
flattenBigQueryRoutineSparkOptionsFileUris(original["fileUris"], d, config)
transformed["archive_uris"] =
flattenBigQueryRoutineSparkOptionsArchiveUris(original["archiveUris"], d, config)
transformed["main_class"] =
flattenBigQueryRoutineSparkOptionsMainClass(original["mainClass"], d, config)
return []interface{}{transformed}
}
func flattenBigQueryRoutineSparkOptionsConnection(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenBigQueryRoutineSparkOptionsRuntimeVersion(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenBigQueryRoutineSparkOptionsContainerImage(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenBigQueryRoutineSparkOptionsProperties(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenBigQueryRoutineSparkOptionsMainFileUri(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenBigQueryRoutineSparkOptionsPyFileUris(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenBigQueryRoutineSparkOptionsJarUris(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenBigQueryRoutineSparkOptionsFileUris(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenBigQueryRoutineSparkOptionsArchiveUris(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenBigQueryRoutineSparkOptionsMainClass(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func expandBigQueryRoutineRoutineReference(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {

transformed := make(map[string]interface{})
Expand Down Expand Up @@ -852,3 +1022,132 @@ func expandBigQueryRoutineDescription(v interface{}, d tpgresource.TerraformReso
func expandBigQueryRoutineDeterminismLevel(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandBigQueryRoutineSparkOptions(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
l := v.([]interface{})
if len(l) == 0 || l[0] == nil {
return nil, nil
}
raw := l[0]
original := raw.(map[string]interface{})
transformed := make(map[string]interface{})

transformedConnection, err := expandBigQueryRoutineSparkOptionsConnection(original["connection"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedConnection); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["connection"] = transformedConnection
}

transformedRuntimeVersion, err := expandBigQueryRoutineSparkOptionsRuntimeVersion(original["runtime_version"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedRuntimeVersion); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["runtimeVersion"] = transformedRuntimeVersion
}

transformedContainerImage, err := expandBigQueryRoutineSparkOptionsContainerImage(original["container_image"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedContainerImage); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["containerImage"] = transformedContainerImage
}

transformedProperties, err := expandBigQueryRoutineSparkOptionsProperties(original["properties"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedProperties); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["properties"] = transformedProperties
}

transformedMainFileUri, err := expandBigQueryRoutineSparkOptionsMainFileUri(original["main_file_uri"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedMainFileUri); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["mainFileUri"] = transformedMainFileUri
}

transformedPyFileUris, err := expandBigQueryRoutineSparkOptionsPyFileUris(original["py_file_uris"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedPyFileUris); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["pyFileUris"] = transformedPyFileUris
}

transformedJarUris, err := expandBigQueryRoutineSparkOptionsJarUris(original["jar_uris"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedJarUris); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["jarUris"] = transformedJarUris
}

transformedFileUris, err := expandBigQueryRoutineSparkOptionsFileUris(original["file_uris"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedFileUris); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["fileUris"] = transformedFileUris
}

transformedArchiveUris, err := expandBigQueryRoutineSparkOptionsArchiveUris(original["archive_uris"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedArchiveUris); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["archiveUris"] = transformedArchiveUris
}

transformedMainClass, err := expandBigQueryRoutineSparkOptionsMainClass(original["main_class"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedMainClass); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["mainClass"] = transformedMainClass
}

return transformed, nil
}

func expandBigQueryRoutineSparkOptionsConnection(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandBigQueryRoutineSparkOptionsRuntimeVersion(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandBigQueryRoutineSparkOptionsContainerImage(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandBigQueryRoutineSparkOptionsProperties(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (map[string]string, error) {
if v == nil {
return map[string]string{}, nil
}
m := make(map[string]string)
for k, val := range v.(map[string]interface{}) {
m[k] = val.(string)
}
return m, nil
}

func expandBigQueryRoutineSparkOptionsMainFileUri(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandBigQueryRoutineSparkOptionsPyFileUris(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandBigQueryRoutineSparkOptionsJarUris(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandBigQueryRoutineSparkOptionsFileUris(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandBigQueryRoutineSparkOptionsArchiveUris(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandBigQueryRoutineSparkOptionsMainClass(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}
Loading

0 comments on commit 1510da6

Please sign in to comment.