diff --git a/.changelog/9793.txt b/.changelog/9793.txt new file mode 100644 index 0000000000..6da6cf2f01 --- /dev/null +++ b/.changelog/9793.txt @@ -0,0 +1,3 @@ +```release-note:enhancement +bigquery: add `spark_options` field to `google_bigquery_routine` resource +``` diff --git a/google-beta/services/bigquery/resource_bigquery_routine.go b/google-beta/services/bigquery/resource_bigquery_routine.go index e31d9c0d69..d12345b7d9 100644 --- a/google-beta/services/bigquery/resource_bigquery_routine.go +++ b/google-beta/services/bigquery/resource_bigquery_routine.go @@ -147,8 +147,8 @@ imported JAVASCRIPT libraries.`, "language": { Type: schema.TypeString, Optional: true, - ValidateFunc: verify.ValidateEnum([]string{"SQL", "JAVASCRIPT", ""}), - Description: `The language of the routine. Possible values: ["SQL", "JAVASCRIPT"]`, + ValidateFunc: verify.ValidateEnum([]string{"SQL", "JAVASCRIPT", "PYTHON", "JAVA", "SCALA", ""}), + Description: `The language of the routine. Possible values: ["SQL", "JAVASCRIPT", "PYTHON", "JAVA", "SCALA"]`, }, "return_table_type": { Type: schema.TypeString, @@ -176,6 +176,90 @@ d the order of values or replaced STRUCT field type with RECORD field type, we c cannot suppress the recurring diff this causes. As a workaround, we recommend using the schema as returned by the API.`, }, + "spark_options": { + Type: schema.TypeList, + Optional: true, + Description: `Optional. If language is one of "PYTHON", "JAVA", "SCALA", this field stores the options for spark stored procedure.`, + MaxItems: 1, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "archive_uris": { + Type: schema.TypeList, + Computed: true, + Optional: true, + Description: `Archive files to be extracted into the working directory of each executor. For more information about Apache Spark, see Apache Spark.`, + Elem: &schema.Schema{ + Type: schema.TypeString, + }, + }, + "connection": { + Type: schema.TypeString, + Optional: true, + Description: `Fully qualified name of the user-provided Spark connection object. +Format: "projects/{projectId}/locations/{locationId}/connections/{connectionId}"`, + }, + "container_image": { + Type: schema.TypeString, + Optional: true, + Description: `Custom container image for the runtime environment.`, + }, + "file_uris": { + Type: schema.TypeList, + Computed: true, + Optional: true, + Description: `Files to be placed in the working directory of each executor. For more information about Apache Spark, see Apache Spark.`, + Elem: &schema.Schema{ + Type: schema.TypeString, + }, + }, + "jar_uris": { + Type: schema.TypeList, + Computed: true, + Optional: true, + Description: `JARs to include on the driver and executor CLASSPATH. For more information about Apache Spark, see Apache Spark.`, + Elem: &schema.Schema{ + Type: schema.TypeString, + }, + }, + "main_class": { + Type: schema.TypeString, + Optional: true, + Description: `The fully qualified name of a class in jarUris, for example, com.example.wordcount. +Exactly one of mainClass and main_jar_uri field should be set for Java/Scala language type.`, + }, + "main_file_uri": { + Type: schema.TypeString, + Optional: true, + Description: `The main file/jar URI of the Spark application. +Exactly one of the definitionBody field and the mainFileUri field must be set for Python. +Exactly one of mainClass and mainFileUri field should be set for Java/Scala language type.`, + }, + "properties": { + Type: schema.TypeMap, + Computed: true, + Optional: true, + Description: `Configuration properties as a set of key/value pairs, which will be passed on to the Spark application. +For more information, see Apache Spark and the procedure option list. +An object containing a list of "key": value pairs. Example: { "name": "wrench", "mass": "1.3kg", "count": "3" }.`, + Elem: &schema.Schema{Type: schema.TypeString}, + }, + "py_file_uris": { + Type: schema.TypeList, + Computed: true, + Optional: true, + Description: `Python files to be placed on the PYTHONPATH for PySpark application. Supported file types: .py, .egg, and .zip. For more information about Apache Spark, see Apache Spark.`, + Elem: &schema.Schema{ + Type: schema.TypeString, + }, + }, + "runtime_version": { + Type: schema.TypeString, + Optional: true, + Description: `Runtime version. If not specified, the default runtime version is used.`, + }, + }, + }, + }, "creation_time": { Type: schema.TypeInt, Computed: true, @@ -267,6 +351,12 @@ func resourceBigQueryRoutineCreate(d *schema.ResourceData, meta interface{}) err } else if v, ok := d.GetOkExists("determinism_level"); !tpgresource.IsEmptyValue(reflect.ValueOf(determinismLevelProp)) && (ok || !reflect.DeepEqual(v, determinismLevelProp)) { obj["determinismLevel"] = determinismLevelProp } + sparkOptionsProp, err := expandBigQueryRoutineSparkOptions(d.Get("spark_options"), d, config) + if err != nil { + return err + } else if v, ok := d.GetOkExists("spark_options"); !tpgresource.IsEmptyValue(reflect.ValueOf(sparkOptionsProp)) && (ok || !reflect.DeepEqual(v, sparkOptionsProp)) { + obj["sparkOptions"] = sparkOptionsProp + } url, err := tpgresource.ReplaceVars(d, config, "{{BigQueryBasePath}}projects/{{project}}/datasets/{{dataset_id}}/routines") if err != nil { @@ -400,6 +490,9 @@ func resourceBigQueryRoutineRead(d *schema.ResourceData, meta interface{}) error if err := d.Set("determinism_level", flattenBigQueryRoutineDeterminismLevel(res["determinismLevel"], d, config)); err != nil { return fmt.Errorf("Error reading Routine: %s", err) } + if err := d.Set("spark_options", flattenBigQueryRoutineSparkOptions(res["sparkOptions"], d, config)); err != nil { + return fmt.Errorf("Error reading Routine: %s", err) + } return nil } @@ -480,6 +573,12 @@ func resourceBigQueryRoutineUpdate(d *schema.ResourceData, meta interface{}) err } else if v, ok := d.GetOkExists("determinism_level"); !tpgresource.IsEmptyValue(reflect.ValueOf(v)) && (ok || !reflect.DeepEqual(v, determinismLevelProp)) { obj["determinismLevel"] = determinismLevelProp } + sparkOptionsProp, err := expandBigQueryRoutineSparkOptions(d.Get("spark_options"), d, config) + if err != nil { + return err + } else if v, ok := d.GetOkExists("spark_options"); !tpgresource.IsEmptyValue(reflect.ValueOf(v)) && (ok || !reflect.DeepEqual(v, sparkOptionsProp)) { + obj["sparkOptions"] = sparkOptionsProp + } url, err := tpgresource.ReplaceVars(d, config, "{{BigQueryBasePath}}projects/{{project}}/datasets/{{dataset_id}}/routines/{{routine_id}}") if err != nil { @@ -727,6 +826,77 @@ func flattenBigQueryRoutineDeterminismLevel(v interface{}, d *schema.ResourceDat return v } +func flattenBigQueryRoutineSparkOptions(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} { + if v == nil { + return nil + } + original := v.(map[string]interface{}) + if len(original) == 0 { + return nil + } + transformed := make(map[string]interface{}) + transformed["connection"] = + flattenBigQueryRoutineSparkOptionsConnection(original["connection"], d, config) + transformed["runtime_version"] = + flattenBigQueryRoutineSparkOptionsRuntimeVersion(original["runtimeVersion"], d, config) + transformed["container_image"] = + flattenBigQueryRoutineSparkOptionsContainerImage(original["containerImage"], d, config) + transformed["properties"] = + flattenBigQueryRoutineSparkOptionsProperties(original["properties"], d, config) + transformed["main_file_uri"] = + flattenBigQueryRoutineSparkOptionsMainFileUri(original["mainFileUri"], d, config) + transformed["py_file_uris"] = + flattenBigQueryRoutineSparkOptionsPyFileUris(original["pyFileUris"], d, config) + transformed["jar_uris"] = + flattenBigQueryRoutineSparkOptionsJarUris(original["jarUris"], d, config) + transformed["file_uris"] = + flattenBigQueryRoutineSparkOptionsFileUris(original["fileUris"], d, config) + transformed["archive_uris"] = + flattenBigQueryRoutineSparkOptionsArchiveUris(original["archiveUris"], d, config) + transformed["main_class"] = + flattenBigQueryRoutineSparkOptionsMainClass(original["mainClass"], d, config) + return []interface{}{transformed} +} +func flattenBigQueryRoutineSparkOptionsConnection(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} { + return v +} + +func flattenBigQueryRoutineSparkOptionsRuntimeVersion(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} { + return v +} + +func flattenBigQueryRoutineSparkOptionsContainerImage(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} { + return v +} + +func flattenBigQueryRoutineSparkOptionsProperties(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} { + return v +} + +func flattenBigQueryRoutineSparkOptionsMainFileUri(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} { + return v +} + +func flattenBigQueryRoutineSparkOptionsPyFileUris(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} { + return v +} + +func flattenBigQueryRoutineSparkOptionsJarUris(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} { + return v +} + +func flattenBigQueryRoutineSparkOptionsFileUris(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} { + return v +} + +func flattenBigQueryRoutineSparkOptionsArchiveUris(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} { + return v +} + +func flattenBigQueryRoutineSparkOptionsMainClass(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} { + return v +} + func expandBigQueryRoutineRoutineReference(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) { transformed := make(map[string]interface{}) @@ -852,3 +1022,132 @@ func expandBigQueryRoutineDescription(v interface{}, d tpgresource.TerraformReso func expandBigQueryRoutineDeterminismLevel(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) { return v, nil } + +func expandBigQueryRoutineSparkOptions(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) { + l := v.([]interface{}) + if len(l) == 0 || l[0] == nil { + return nil, nil + } + raw := l[0] + original := raw.(map[string]interface{}) + transformed := make(map[string]interface{}) + + transformedConnection, err := expandBigQueryRoutineSparkOptionsConnection(original["connection"], d, config) + if err != nil { + return nil, err + } else if val := reflect.ValueOf(transformedConnection); val.IsValid() && !tpgresource.IsEmptyValue(val) { + transformed["connection"] = transformedConnection + } + + transformedRuntimeVersion, err := expandBigQueryRoutineSparkOptionsRuntimeVersion(original["runtime_version"], d, config) + if err != nil { + return nil, err + } else if val := reflect.ValueOf(transformedRuntimeVersion); val.IsValid() && !tpgresource.IsEmptyValue(val) { + transformed["runtimeVersion"] = transformedRuntimeVersion + } + + transformedContainerImage, err := expandBigQueryRoutineSparkOptionsContainerImage(original["container_image"], d, config) + if err != nil { + return nil, err + } else if val := reflect.ValueOf(transformedContainerImage); val.IsValid() && !tpgresource.IsEmptyValue(val) { + transformed["containerImage"] = transformedContainerImage + } + + transformedProperties, err := expandBigQueryRoutineSparkOptionsProperties(original["properties"], d, config) + if err != nil { + return nil, err + } else if val := reflect.ValueOf(transformedProperties); val.IsValid() && !tpgresource.IsEmptyValue(val) { + transformed["properties"] = transformedProperties + } + + transformedMainFileUri, err := expandBigQueryRoutineSparkOptionsMainFileUri(original["main_file_uri"], d, config) + if err != nil { + return nil, err + } else if val := reflect.ValueOf(transformedMainFileUri); val.IsValid() && !tpgresource.IsEmptyValue(val) { + transformed["mainFileUri"] = transformedMainFileUri + } + + transformedPyFileUris, err := expandBigQueryRoutineSparkOptionsPyFileUris(original["py_file_uris"], d, config) + if err != nil { + return nil, err + } else if val := reflect.ValueOf(transformedPyFileUris); val.IsValid() && !tpgresource.IsEmptyValue(val) { + transformed["pyFileUris"] = transformedPyFileUris + } + + transformedJarUris, err := expandBigQueryRoutineSparkOptionsJarUris(original["jar_uris"], d, config) + if err != nil { + return nil, err + } else if val := reflect.ValueOf(transformedJarUris); val.IsValid() && !tpgresource.IsEmptyValue(val) { + transformed["jarUris"] = transformedJarUris + } + + transformedFileUris, err := expandBigQueryRoutineSparkOptionsFileUris(original["file_uris"], d, config) + if err != nil { + return nil, err + } else if val := reflect.ValueOf(transformedFileUris); val.IsValid() && !tpgresource.IsEmptyValue(val) { + transformed["fileUris"] = transformedFileUris + } + + transformedArchiveUris, err := expandBigQueryRoutineSparkOptionsArchiveUris(original["archive_uris"], d, config) + if err != nil { + return nil, err + } else if val := reflect.ValueOf(transformedArchiveUris); val.IsValid() && !tpgresource.IsEmptyValue(val) { + transformed["archiveUris"] = transformedArchiveUris + } + + transformedMainClass, err := expandBigQueryRoutineSparkOptionsMainClass(original["main_class"], d, config) + if err != nil { + return nil, err + } else if val := reflect.ValueOf(transformedMainClass); val.IsValid() && !tpgresource.IsEmptyValue(val) { + transformed["mainClass"] = transformedMainClass + } + + return transformed, nil +} + +func expandBigQueryRoutineSparkOptionsConnection(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) { + return v, nil +} + +func expandBigQueryRoutineSparkOptionsRuntimeVersion(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) { + return v, nil +} + +func expandBigQueryRoutineSparkOptionsContainerImage(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) { + return v, nil +} + +func expandBigQueryRoutineSparkOptionsProperties(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (map[string]string, error) { + if v == nil { + return map[string]string{}, nil + } + m := make(map[string]string) + for k, val := range v.(map[string]interface{}) { + m[k] = val.(string) + } + return m, nil +} + +func expandBigQueryRoutineSparkOptionsMainFileUri(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) { + return v, nil +} + +func expandBigQueryRoutineSparkOptionsPyFileUris(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) { + return v, nil +} + +func expandBigQueryRoutineSparkOptionsJarUris(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) { + return v, nil +} + +func expandBigQueryRoutineSparkOptionsFileUris(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) { + return v, nil +} + +func expandBigQueryRoutineSparkOptionsArchiveUris(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) { + return v, nil +} + +func expandBigQueryRoutineSparkOptionsMainClass(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) { + return v, nil +} diff --git a/google-beta/services/bigquery/resource_bigquery_routine_generated_test.go b/google-beta/services/bigquery/resource_bigquery_routine_generated_test.go index 1be87f690f..64847c8b14 100644 --- a/google-beta/services/bigquery/resource_bigquery_routine_generated_test.go +++ b/google-beta/services/bigquery/resource_bigquery_routine_generated_test.go @@ -170,6 +170,187 @@ resource "google_bigquery_routine" "sproc" { `, context) } +func TestAccBigQueryRoutine_bigQueryRoutinePysparkExample(t *testing.T) { + t.Parallel() + + context := map[string]interface{}{ + "random_suffix": acctest.RandString(t, 10), + } + + acctest.VcrTest(t, resource.TestCase{ + PreCheck: func() { acctest.AccTestPreCheck(t) }, + ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories(t), + CheckDestroy: testAccCheckBigQueryRoutineDestroyProducer(t), + Steps: []resource.TestStep{ + { + Config: testAccBigQueryRoutine_bigQueryRoutinePysparkExample(context), + }, + { + ResourceName: "google_bigquery_routine.pyspark", + ImportState: true, + ImportStateVerify: true, + }, + }, + }) +} + +func testAccBigQueryRoutine_bigQueryRoutinePysparkExample(context map[string]interface{}) string { + return acctest.Nprintf(` +resource "google_bigquery_dataset" "test" { + dataset_id = "tf_test_dataset_id%{random_suffix}" +} + +resource "google_bigquery_connection" "test" { + connection_id = "tf_test_connection_id%{random_suffix}" + location = "US" + spark { } +} + +resource "google_bigquery_routine" "pyspark" { + dataset_id = google_bigquery_dataset.test.dataset_id + routine_id = "tf_test_routine_id%{random_suffix}" + routine_type = "PROCEDURE" + language = "PYTHON" + definition_body = <<-EOS + from pyspark.sql import SparkSession + + spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate() + + # Load data from BigQuery. + words = spark.read.format("bigquery") \ + .option("table", "bigquery-public-data:samples.shakespeare") \ + .load() + words.createOrReplaceTempView("words") + + # Perform word count. + word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed("sum(word_count)", "sum_word_count") + word_count.show() + word_count.printSchema() + + # Saving the data to BigQuery + word_count.write.format("bigquery") \ + .option("writeMethod", "direct") \ + .save("wordcount_dataset.wordcount_output") + EOS + spark_options { + connection = google_bigquery_connection.test.name + runtime_version = "2.1" + } +} +`, context) +} + +func TestAccBigQueryRoutine_bigQueryRoutinePysparkMainfileExample(t *testing.T) { + t.Parallel() + + context := map[string]interface{}{ + "random_suffix": acctest.RandString(t, 10), + } + + acctest.VcrTest(t, resource.TestCase{ + PreCheck: func() { acctest.AccTestPreCheck(t) }, + ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories(t), + CheckDestroy: testAccCheckBigQueryRoutineDestroyProducer(t), + Steps: []resource.TestStep{ + { + Config: testAccBigQueryRoutine_bigQueryRoutinePysparkMainfileExample(context), + }, + { + ResourceName: "google_bigquery_routine.pyspark_mainfile", + ImportState: true, + ImportStateVerify: true, + }, + }, + }) +} + +func testAccBigQueryRoutine_bigQueryRoutinePysparkMainfileExample(context map[string]interface{}) string { + return acctest.Nprintf(` +resource "google_bigquery_dataset" "test" { + dataset_id = "tf_test_dataset_id%{random_suffix}" +} + +resource "google_bigquery_connection" "test" { + connection_id = "tf_test_connection_id%{random_suffix}" + location = "US" + spark { } +} + +resource "google_bigquery_routine" "pyspark_mainfile" { + dataset_id = google_bigquery_dataset.test.dataset_id + routine_id = "tf_test_routine_id%{random_suffix}" + routine_type = "PROCEDURE" + language = "PYTHON" + definition_body = "" + spark_options { + connection = google_bigquery_connection.test.name + runtime_version = "2.1" + main_file_uri = "gs://test-bucket/main.py" + py_file_uris = ["gs://test-bucket/lib.py"] + file_uris = ["gs://test-bucket/distribute_in_executor.json"] + archive_uris = ["gs://test-bucket/distribute_in_executor.tar.gz"] + } +} +`, context) +} + +func TestAccBigQueryRoutine_bigQueryRoutineSparkJarExample(t *testing.T) { + t.Parallel() + + context := map[string]interface{}{ + "random_suffix": acctest.RandString(t, 10), + } + + acctest.VcrTest(t, resource.TestCase{ + PreCheck: func() { acctest.AccTestPreCheck(t) }, + ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories(t), + CheckDestroy: testAccCheckBigQueryRoutineDestroyProducer(t), + Steps: []resource.TestStep{ + { + Config: testAccBigQueryRoutine_bigQueryRoutineSparkJarExample(context), + }, + { + ResourceName: "google_bigquery_routine.spark_jar", + ImportState: true, + ImportStateVerify: true, + }, + }, + }) +} + +func testAccBigQueryRoutine_bigQueryRoutineSparkJarExample(context map[string]interface{}) string { + return acctest.Nprintf(` +resource "google_bigquery_dataset" "test" { + dataset_id = "tf_test_dataset_id%{random_suffix}" +} + +resource "google_bigquery_connection" "test" { + connection_id = "tf_test_connection_id%{random_suffix}" + location = "US" + spark { } +} + +resource "google_bigquery_routine" "spark_jar" { + dataset_id = google_bigquery_dataset.test.dataset_id + routine_id = "tf_test_routine_id%{random_suffix}" + routine_type = "PROCEDURE" + language = "SCALA" + definition_body = "" + spark_options { + connection = google_bigquery_connection.test.name + runtime_version = "2.1" + container_image = "gcr.io/my-project-id/my-spark-image:latest" + main_class = "com.google.test.jar.MainClass" + jar_uris = [ "gs://test-bucket/uberjar_spark_spark3.jar" ] + properties = { + "spark.dataproc.scaling.version" : "2", + "spark.reducer.fetchMigratedShuffle.enabled" : "true", + } + } +} +`, context) +} + func testAccCheckBigQueryRoutineDestroyProducer(t *testing.T) func(s *terraform.State) error { return func(s *terraform.State) error { for name, rs := range s.RootModule().Resources { diff --git a/google-beta/services/bigquery/resource_bigquery_routine_test.go b/google-beta/services/bigquery/resource_bigquery_routine_test.go index 5bd2d2ac2d..14019b7f31 100644 --- a/google-beta/services/bigquery/resource_bigquery_routine_test.go +++ b/google-beta/services/bigquery/resource_bigquery_routine_test.go @@ -82,3 +82,96 @@ resource "google_bigquery_routine" "sproc" { } `, dataset, routine) } + +func TestAccBigQueryRoutine_bigQueryRoutineSparkJar_Update(t *testing.T) { + t.Parallel() + + context := map[string]interface{}{ + "random_suffix": acctest.RandString(t, 10), + } + + acctest.VcrTest(t, resource.TestCase{ + PreCheck: func() { acctest.AccTestPreCheck(t) }, + ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories(t), + CheckDestroy: testAccCheckBigQueryRoutineDestroyProducer(t), + Steps: []resource.TestStep{ + { + Config: testAccBigQueryRoutine_bigQueryRoutineSparkJar(context), + }, + { + ResourceName: "google_bigquery_routine.spark_jar", + ImportState: true, + ImportStateVerify: true, + }, + { + Config: testAccBigQueryRoutine_bigQueryRoutineSparkJar_Update(context), + }, + { + ResourceName: "google_bigquery_routine.spark_jar", + ImportState: true, + ImportStateVerify: true, + }, + }, + }) +} + +func testAccBigQueryRoutine_bigQueryRoutineSparkJar(context map[string]interface{}) string { + return acctest.Nprintf(` +resource "google_bigquery_dataset" "test" { + dataset_id = "tf_test_dataset_id%{random_suffix}" +} + +resource "google_bigquery_connection" "test" { + connection_id = "tf_test_connection_id%{random_suffix}" + location = "US" + spark { } +} + +resource "google_bigquery_routine" "spark_jar" { + dataset_id = google_bigquery_dataset.test.dataset_id + routine_id = "tf_test_routine_id%{random_suffix}" + routine_type = "PROCEDURE" + language = "SCALA" + definition_body = "" + spark_options { + connection = google_bigquery_connection.test.name + runtime_version = "2.0" + main_class = "com.google.test.jar.MainClass" + jar_uris = [ "gs://test-bucket/testjar_spark_spark3.jar" ] + } +} +`, context) +} + +func testAccBigQueryRoutine_bigQueryRoutineSparkJar_Update(context map[string]interface{}) string { + return acctest.Nprintf(` +resource "google_bigquery_dataset" "test" { + dataset_id = "tf_test_dataset_id%{random_suffix}" +} + +resource "google_bigquery_connection" "test_updated" { + connection_id = "tf_test_connection_updated_id%{random_suffix}" + location = "US" + spark { } +} + +resource "google_bigquery_routine" "spark_jar" { + dataset_id = google_bigquery_dataset.test.dataset_id + routine_id = "tf_test_routine_id%{random_suffix}" + routine_type = "PROCEDURE" + language = "SCALA" + definition_body = "" + spark_options { + connection = google_bigquery_connection.test_updated.name + runtime_version = "2.1" + container_image = "gcr.io/my-project-id/my-spark-image:latest" + main_class = "com.google.test.jar.MainClassUpdated" + jar_uris = [ "gs://test-bucket/uberjar_spark_spark3_updated.jar" ] + properties = { + "spark.dataproc.scaling.version" : "2", + "spark.reducer.fetchMigratedShuffle.enabled" : "true", + } + } +} +`, context) +} diff --git a/website/docs/r/bigquery_routine.html.markdown b/website/docs/r/bigquery_routine.html.markdown index 8e25a94876..98e99b1303 100644 --- a/website/docs/r/bigquery_routine.html.markdown +++ b/website/docs/r/bigquery_routine.html.markdown @@ -111,6 +111,130 @@ resource "google_bigquery_routine" "sproc" { ] }) } ``` +
+## Example Usage - Big Query Routine Pyspark + + +```hcl +resource "google_bigquery_dataset" "test" { + dataset_id = "dataset_id" +} + +resource "google_bigquery_connection" "test" { + connection_id = "connection_id" + location = "US" + spark { } +} + +resource "google_bigquery_routine" "pyspark" { + dataset_id = google_bigquery_dataset.test.dataset_id + routine_id = "routine_id" + routine_type = "PROCEDURE" + language = "PYTHON" + definition_body = <<-EOS + from pyspark.sql import SparkSession + + spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate() + + # Load data from BigQuery. + words = spark.read.format("bigquery") \ + .option("table", "bigquery-public-data:samples.shakespeare") \ + .load() + words.createOrReplaceTempView("words") + + # Perform word count. + word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed("sum(word_count)", "sum_word_count") + word_count.show() + word_count.printSchema() + + # Saving the data to BigQuery + word_count.write.format("bigquery") \ + .option("writeMethod", "direct") \ + .save("wordcount_dataset.wordcount_output") + EOS + spark_options { + connection = google_bigquery_connection.test.name + runtime_version = "2.1" + } +} +``` + +## Example Usage - Big Query Routine Pyspark Mainfile + + +```hcl +resource "google_bigquery_dataset" "test" { + dataset_id = "dataset_id" +} + +resource "google_bigquery_connection" "test" { + connection_id = "connection_id" + location = "US" + spark { } +} + +resource "google_bigquery_routine" "pyspark_mainfile" { + dataset_id = google_bigquery_dataset.test.dataset_id + routine_id = "routine_id" + routine_type = "PROCEDURE" + language = "PYTHON" + definition_body = "" + spark_options { + connection = google_bigquery_connection.test.name + runtime_version = "2.1" + main_file_uri = "gs://test-bucket/main.py" + py_file_uris = ["gs://test-bucket/lib.py"] + file_uris = ["gs://test-bucket/distribute_in_executor.json"] + archive_uris = ["gs://test-bucket/distribute_in_executor.tar.gz"] + } +} +``` + +## Example Usage - Big Query Routine Spark Jar + + +```hcl +resource "google_bigquery_dataset" "test" { + dataset_id = "dataset_id" +} + +resource "google_bigquery_connection" "test" { + connection_id = "connection_id" + location = "US" + spark { } +} + +resource "google_bigquery_routine" "spark_jar" { + dataset_id = google_bigquery_dataset.test.dataset_id + routine_id = "routine_id" + routine_type = "PROCEDURE" + language = "SCALA" + definition_body = "" + spark_options { + connection = google_bigquery_connection.test.name + runtime_version = "2.1" + container_image = "gcr.io/my-project-id/my-spark-image:latest" + main_class = "com.google.test.jar.MainClass" + jar_uris = [ "gs://test-bucket/uberjar_spark_spark3.jar" ] + properties = { + "spark.dataproc.scaling.version" : "2", + "spark.reducer.fetchMigratedShuffle.enabled" : "true", + } + } +} +``` ## Argument Reference @@ -142,7 +266,7 @@ The following arguments are supported: * `language` - (Optional) The language of the routine. - Possible values are: `SQL`, `JAVASCRIPT`. + Possible values are: `SQL`, `JAVASCRIPT`, `PYTHON`, `JAVA`, `SCALA`. * `arguments` - (Optional) @@ -182,6 +306,11 @@ The following arguments are supported: The determinism level of the JavaScript UDF if defined. Possible values are: `DETERMINISM_LEVEL_UNSPECIFIED`, `DETERMINISTIC`, `NOT_DETERMINISTIC`. +* `spark_options` - + (Optional) + Optional. If language is one of "PYTHON", "JAVA", "SCALA", this field stores the options for spark stored procedure. + Structure is [documented below](#nested_spark_options). + * `project` - (Optional) The ID of the project in which the resource belongs. If it is not provided, the provider project is used. @@ -213,6 +342,54 @@ The following arguments are supported: suppress the recurring diff this causes. As a workaround, we recommend using the schema as returned by the API. +The `spark_options` block supports: + +* `connection` - + (Optional) + Fully qualified name of the user-provided Spark connection object. + Format: "projects/{projectId}/locations/{locationId}/connections/{connectionId}" + +* `runtime_version` - + (Optional) + Runtime version. If not specified, the default runtime version is used. + +* `container_image` - + (Optional) + Custom container image for the runtime environment. + +* `properties` - + (Optional) + Configuration properties as a set of key/value pairs, which will be passed on to the Spark application. + For more information, see Apache Spark and the procedure option list. + An object containing a list of "key": value pairs. Example: { "name": "wrench", "mass": "1.3kg", "count": "3" }. + +* `main_file_uri` - + (Optional) + The main file/jar URI of the Spark application. + Exactly one of the definitionBody field and the mainFileUri field must be set for Python. + Exactly one of mainClass and mainFileUri field should be set for Java/Scala language type. + +* `py_file_uris` - + (Optional) + Python files to be placed on the PYTHONPATH for PySpark application. Supported file types: .py, .egg, and .zip. For more information about Apache Spark, see Apache Spark. + +* `jar_uris` - + (Optional) + JARs to include on the driver and executor CLASSPATH. For more information about Apache Spark, see Apache Spark. + +* `file_uris` - + (Optional) + Files to be placed in the working directory of each executor. For more information about Apache Spark, see Apache Spark. + +* `archive_uris` - + (Optional) + Archive files to be extracted into the working directory of each executor. For more information about Apache Spark, see Apache Spark. + +* `main_class` - + (Optional) + The fully qualified name of a class in jarUris, for example, com.example.wordcount. + Exactly one of mainClass and main_jar_uri field should be set for Java/Scala language type. + ## Attributes Reference In addition to the arguments listed above, the following computed attributes are exported: