Skip to content

Commit

Permalink
Dataproc job - add presto config (#5739) (#4171)
Browse files Browse the repository at this point in the history
* dataproc-job-log-level

* add presto job

* fmt

* desc for presto props

* docs

* docs

* actually add presto config

* presto test

* presto test actually

* docs

* presto expand/flatten

* presto expand/flatten

* fix test

* Update resource_dataproc_job_test.go.erb

Signed-off-by: Modular Magician <[email protected]>
  • Loading branch information
modular-magician authored Apr 1, 2022
1 parent b32e924 commit 97797be
Show file tree
Hide file tree
Showing 4 changed files with 234 additions and 6 deletions.
3 changes: 3 additions & 0 deletions .changelog/5739.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
dataproc: adds `presto_config` to `dataproc_job`
```
131 changes: 125 additions & 6 deletions google-beta/resource_dataproc_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
dataproc "google.golang.org/api/dataproc/v1beta2"
)

var jobTypes = []string{"pyspark_config", "spark_config", "hadoop_config", "hive_config", "pig_config", "sparksql_config", "presto_config"}

func resourceDataprocJob() *schema.Resource {
return &schema.Resource{
Create: resourceDataprocJobCreate,
Expand Down Expand Up @@ -175,6 +177,7 @@ func resourceDataprocJob() *schema.Resource {
"hive_config": hiveSchema,
"pig_config": pigSchema,
"sparksql_config": sparkSqlSchema,
"presto_config": prestoSchema,
},
UseJSONNumber: true,
}
Expand Down Expand Up @@ -256,6 +259,11 @@ func resourceDataprocJobCreate(d *schema.ResourceData, meta interface{}) error {
submitReq.Job.SparkSqlJob = expandSparkSqlJob(config)
}

if v, ok := d.GetOk("presto_config"); ok {
config := extractFirstMapConfig(v.([]interface{}))
submitReq.Job.PrestoJob = expandPrestoJob(config)
}

// Submit the job
job, err := config.NewDataprocClient(userAgent).Projects.Regions.Jobs.Submit(
project, region, submitReq).Do()
Expand Down Expand Up @@ -354,6 +362,12 @@ func resourceDataprocJobRead(d *schema.ResourceData, meta interface{}) error {
return fmt.Errorf("Error setting sparksql_config: %s", err)
}
}

if job.PrestoJob != nil {
if err := d.Set("presto_config", flattenPrestoJob(job.PrestoJob)); err != nil {
return fmt.Errorf("Error setting presto_config: %s", err)
}
}
return nil
}

Expand Down Expand Up @@ -436,7 +450,7 @@ var pySparkSchema = &schema.Schema{
ForceNew: true,
MaxItems: 1,
Description: `The config of pySpark job.`,
ExactlyOneOf: []string{"pyspark_config", "spark_config", "hadoop_config", "hive_config", "pig_config", "sparksql_config"},
ExactlyOneOf: jobTypes,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"main_python_file_uri": {
Expand Down Expand Up @@ -565,7 +579,7 @@ var sparkSchema = &schema.Schema{
ForceNew: true,
MaxItems: 1,
Description: `The config of the Spark job.`,
ExactlyOneOf: []string{"pyspark_config", "spark_config", "hadoop_config", "hive_config", "pig_config", "sparksql_config"},
ExactlyOneOf: jobTypes,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
// main driver: can be only one of the class | jar_file
Expand Down Expand Up @@ -686,7 +700,7 @@ var hadoopSchema = &schema.Schema{
ForceNew: true,
MaxItems: 1,
Description: `The config of Hadoop job`,
ExactlyOneOf: []string{"spark_config", "pyspark_config", "hadoop_config", "hive_config", "pig_config", "sparksql_config"},
ExactlyOneOf: jobTypes,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
// main driver: can be only one of the main_class | main_jar_file_uri
Expand Down Expand Up @@ -807,7 +821,7 @@ var hiveSchema = &schema.Schema{
ForceNew: true,
MaxItems: 1,
Description: `The config of hive job`,
ExactlyOneOf: []string{"spark_config", "pyspark_config", "hadoop_config", "hive_config", "pig_config", "sparksql_config"},
ExactlyOneOf: jobTypes,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
// main query: can be only one of query_list | query_file_uri
Expand Down Expand Up @@ -913,7 +927,7 @@ var pigSchema = &schema.Schema{
ForceNew: true,
MaxItems: 1,
Description: `The config of pag job.`,
ExactlyOneOf: []string{"spark_config", "pyspark_config", "hadoop_config", "hive_config", "pig_config", "sparksql_config"},
ExactlyOneOf: jobTypes,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
// main query: can be only one of query_list | query_file_uri
Expand Down Expand Up @@ -1022,7 +1036,7 @@ var sparkSqlSchema = &schema.Schema{
ForceNew: true,
MaxItems: 1,
Description: `The config of SparkSql job`,
ExactlyOneOf: []string{"spark_config", "pyspark_config", "hadoop_config", "hive_config", "pig_config", "sparksql_config"},
ExactlyOneOf: jobTypes,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
// main query: can be only one of query_list | query_file_uri
Expand Down Expand Up @@ -1112,6 +1126,111 @@ func expandSparkSqlJob(config map[string]interface{}) *dataproc.SparkSqlJob {

}

// ---- Presto Job ----

var prestoSchema = &schema.Schema{
Type: schema.TypeList,
Optional: true,
ForceNew: true,
MaxItems: 1,
Description: `The config of presto job`,
ExactlyOneOf: jobTypes,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"client_tags": {
Type: schema.TypeList,
Description: `Presto client tags to attach to this query.`,
Optional: true,
ForceNew: true,
Elem: &schema.Schema{Type: schema.TypeString},
},
"continue_on_failure": {
Type: schema.TypeBool,
Optional: true,
ForceNew: true,
Description: `Whether to continue executing queries if a query fails. Setting to true can be useful when executing independent parallel queries. Defaults to false.`,
},
// main query: can be only one of query_list | query_file_uri
"query_list": {
Type: schema.TypeList,
Optional: true,
ForceNew: true,
Description: `The list of SQL queries or statements to execute as part of the job. Conflicts with query_file_uri`,
Elem: &schema.Schema{Type: schema.TypeString},
ExactlyOneOf: []string{"presto_config.0.query_file_uri", "presto_config.0.query_list"},
},

"query_file_uri": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
Description: `The HCFS URI of the script that contains SQL queries. Conflicts with query_list`,
ExactlyOneOf: []string{"presto_config.0.query_file_uri", "presto_config.0.query_list"},
},

"properties": {
Type: schema.TypeMap,
Optional: true,
ForceNew: true,
Description: `A mapping of property names to values. Used to set Presto session properties Equivalent to using the --session flag in the Presto CLI.`,
Elem: &schema.Schema{Type: schema.TypeString},
},
"output_format": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
Description: `The format in which query output will be displayed. See the Presto documentation for supported output formats.`,
},

"logging_config": loggingConfig,
},
},
}

func flattenPrestoJob(job *dataproc.PrestoJob) []map[string]interface{} {
queries := []string{}
if job.QueryList != nil {
queries = job.QueryList.Queries
}
return []map[string]interface{}{
{
"client_tags": job.ClientTags,
"continue_on_failure": job.ContinueOnFailure,
"query_list": queries,
"query_file_uri": job.QueryFileUri,
"properties": job.Properties,
"output_format": job.OutputFormat,
},
}
}

func expandPrestoJob(config map[string]interface{}) *dataproc.PrestoJob {
job := &dataproc.PrestoJob{}
if v, ok := config["client_tags"]; ok {
job.ClientTags = convertStringArr(v.([]interface{}))
}
if v, ok := config["continue_on_failure"]; ok {
job.ContinueOnFailure = v.(bool)
}
if v, ok := config["query_file_uri"]; ok {
job.QueryFileUri = v.(string)
}
if v, ok := config["query_list"]; ok {
job.QueryList = &dataproc.QueryList{
Queries: convertStringArr(v.([]interface{})),
}
}
if v, ok := config["properties"]; ok {
job.Properties = convertStringMap(v.(map[string]interface{}))
}
if v, ok := config["output_format"]; ok {
job.OutputFormat = v.(string)
}

return job

}

// ---- Other flatten / expand methods ----

func expandLoggingConfig(config map[string]interface{}) *dataproc.LoggingConfig {
Expand Down
74 changes: 74 additions & 0 deletions google-beta/resource_dataproc_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,38 @@ func TestAccDataprocJob_SparkSql(t *testing.T) {
})
}

func TestAccDataprocJob_Presto(t *testing.T) {
t.Parallel()

var job dataproc.Job
rnd := randString(t, 10)
vcrTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckDataprocJobDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccDataprocJob_presto(rnd),
Check: resource.ComposeTestCheckFunc(
testAccCheckDataprocJobExists(t, "google_dataproc_job.presto", &job),

// Autogenerated / computed values
resource.TestCheckResourceAttrSet("google_dataproc_job.presto", "reference.0.job_id"),
resource.TestCheckResourceAttrSet("google_dataproc_job.presto", "status.0.state"),
resource.TestCheckResourceAttrSet("google_dataproc_job.presto", "status.0.state_start_time"),

// Unique job config
testAccCheckDataprocJobAttrMatch(
"google_dataproc_job.presto", "presto_config", &job),

// Wait until job completes successfully
testAccCheckDataprocJobCompletesSuccessfully(t, "google_dataproc_job.presto", &job),
),
},
},
})
}

func testAccCheckDataprocJobDestroyProducer(t *testing.T) func(s *terraform.State) error {
return func(s *terraform.State) error {
config := googleProviderConfig(t)
Expand Down Expand Up @@ -688,3 +720,45 @@ resource "google_dataproc_job" "sparksql" {
`, rnd)

}

func testAccDataprocJob_presto(rnd string) string {
return fmt.Sprintf(`
resource "google_dataproc_cluster" "basic" {
name = "dproc-job-test-%s"
region = "us-central1"
cluster_config {
# Keep the costs down with smallest config we can get away with
software_config {
override_properties = {
"dataproc:dataproc.allow.zero.workers" = "true"
}
optional_components = ["PRESTO"]
}
master_config {
num_instances = 1
machine_type = "e2-standard-2"
disk_config {
boot_disk_size_gb = 35
}
}
}
}
resource "google_dataproc_job" "presto" {
region = google_dataproc_cluster.basic.region
force_delete = true
placement {
cluster_name = google_dataproc_cluster.basic.name
}
presto_config {
query_list = [
"SELECT * FROM system.metadata.schema_properties"
]
}
}
`, rnd)

}
32 changes: 32 additions & 0 deletions website/docs/r/dataproc_job.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ output "pyspark_status" {
* [hive_config](#nested_hive_config) - Submits a Hive job to the cluster
* [hpig_config](#nested_hpig_config) - Submits a Pig job to the cluster
* [sparksql_config](#nested_sparksql_config) - Submits a Spark SQL job to the cluster
* [presto_config](#nested_presto_config) - Submits a Presto job to the cluster

- - -

Expand Down Expand Up @@ -320,6 +321,37 @@ resource "google_dataproc_job" "sparksql" {

* `logging_config.driver_log_levels`- (Required) The per-package log levels for the driver. This may include 'root' package name to configure rootLogger. Examples: 'com.google = FATAL', 'root = INFO', 'org.apache = DEBUG'

<a name="nested_presto_config"></a>The `presto_config` block supports:

```hcl
# Submit a Presto job to the cluster
resource "google_dataproc_job" "presto" {
...
presto_config {
query_list = [
"DROP TABLE IF EXISTS dprocjob_test",
"CREATE TABLE dprocjob_test(bar int)",
"SELECT * FROM dprocjob_test WHERE bar > 2",
]
}
}
```

* `client_tags` - (Optional) Presto client tags to attach to this query.

* `continue_on_failure` - (Optional) Whether to continue executing queries if a query fails. Setting to true can be useful when executing independent parallel queries. Defaults to false.

* `query_list`- (Optional) The list of SQL queries or statements to execute as part of the job.
Conflicts with `query_file_uri`

* `query_file_uri` - (Optional) The HCFS URI of the script that contains SQL queries.
Conflicts with `query_list`

* `properties` - (Optional) A mapping of property names to values. Used to set Presto session properties Equivalent to using the --session flag in the Presto CLI.

* `output_format` - (Optional) The format in which query output will be displayed. See the Presto documentation for supported output formats.

* `logging_config.driver_log_levels`- (Required) The per-package log levels for the driver. This may include 'root' package name to configure rootLogger. Examples: 'com.google = FATAL', 'root = INFO', 'org.apache = DEBUG'

## Attributes Reference

Expand Down

0 comments on commit 97797be

Please sign in to comment.