From a9ca4c63374fad81adfd447f80103691ec378304 Mon Sep 17 00:00:00 2001 From: The Magician Date: Thu, 30 Apr 2020 16:16:50 -0400 Subject: [PATCH] dataflow job update-by-replacement (#3387) (#2021) Signed-off-by: Modular Magician --- .changelog/3387.txt | 6 + google-beta/error_retry_predicates.go | 9 ++ google-beta/resource_dataflow_job.go | 156 +++++++++++++++++----- google-beta/resource_dataflow_job_test.go | 81 +++++++++++ 4 files changed, 220 insertions(+), 32 deletions(-) create mode 100644 .changelog/3387.txt diff --git a/.changelog/3387.txt b/.changelog/3387.txt new file mode 100644 index 0000000000..deffe451a1 --- /dev/null +++ b/.changelog/3387.txt @@ -0,0 +1,6 @@ +```release-note:enhancement +dataflow: Added support for update-by-replacement to `google_dataflow_job` +``` +```release-note:enhancement +dataflow: Added drift detection for `google_dataflow_job` `template_gcs_path` and `temp_gcs_location` fields +``` diff --git a/google-beta/error_retry_predicates.go b/google-beta/error_retry_predicates.go index d1a4f9b0a4..cec06f8b57 100644 --- a/google-beta/error_retry_predicates.go +++ b/google-beta/error_retry_predicates.go @@ -222,3 +222,12 @@ func isStoragePreconditionError(err error) (bool, string) { } return false, "" } + +func isDataflowJobUpdateRetryableError(err error) (bool, string) { + if gerr, ok := err.(*googleapi.Error); ok { + if gerr.Code == 404 && strings.Contains(gerr.Body, "in RUNNING OR DRAINING state") { + return true, "Waiting for job to be in a valid state" + } + } + return false, "" +} diff --git a/google-beta/resource_dataflow_job.go b/google-beta/resource_dataflow_job.go index 3d8132014a..ea1ad299a6 100644 --- a/google-beta/resource_dataflow_job.go +++ b/google-beta/resource_dataflow_job.go @@ -6,6 +6,7 @@ import ( "strings" "time" + "github.com/hashicorp/terraform-plugin-sdk/helper/customdiff" "github.com/hashicorp/terraform-plugin-sdk/helper/schema" "github.com/hashicorp/terraform-plugin-sdk/helper/validation" @@ -44,54 +45,56 @@ func resourceDataflowJob() *schema.Resource { return &schema.Resource{ Create: resourceDataflowJobCreate, Read: resourceDataflowJobRead, + Update: resourceDataflowJobUpdateByReplacement, Delete: resourceDataflowJobDelete, + CustomizeDiff: customdiff.All( + resourceDataflowJobTypeCustomizeDiff, + ), Schema: map[string]*schema.Schema{ "name": { Type: schema.TypeString, Required: true, + // ForceNew applies to both stream and batch jobs ForceNew: true, }, "template_gcs_path": { Type: schema.TypeString, Required: true, - ForceNew: true, }, "temp_gcs_location": { Type: schema.TypeString, Required: true, - ForceNew: true, }, "zone": { Type: schema.TypeString, Optional: true, + // ForceNew applies to both stream and batch jobs ForceNew: true, }, "region": { Type: schema.TypeString, Optional: true, + // ForceNew applies to both stream and batch jobs ForceNew: true, }, "max_workers": { Type: schema.TypeInt, Optional: true, - ForceNew: true, }, "parameters": { Type: schema.TypeMap, Optional: true, - ForceNew: true, }, "labels": { Type: schema.TypeMap, Optional: true, - ForceNew: true, DiffSuppressFunc: resourceDataflowJobLabelDiffSuppress, }, @@ -100,13 +103,13 @@ func resourceDataflowJob() *schema.Resource { ValidateFunc: validation.StringInSlice([]string{"cancel", "drain"}, false), Optional: true, Default: "drain", - ForceNew: true, }, "project": { Type: schema.TypeString, Optional: true, Computed: true, + // ForceNew applies to both stream and batch jobs ForceNew: true, }, @@ -121,40 +124,34 @@ func resourceDataflowJob() *schema.Resource { "service_account_email": { Type: schema.TypeString, Optional: true, - ForceNew: true, }, "network": { Type: schema.TypeString, Optional: true, - ForceNew: true, DiffSuppressFunc: compareSelfLinkOrResourceName, }, "subnetwork": { Type: schema.TypeString, Optional: true, - ForceNew: true, DiffSuppressFunc: compareSelfLinkOrResourceName, }, "machine_type": { Type: schema.TypeString, Optional: true, - ForceNew: true, }, "ip_configuration": { Type: schema.TypeString, Optional: true, - ForceNew: true, ValidateFunc: validation.StringInSlice([]string{"WORKER_IP_PUBLIC", "WORKER_IP_PRIVATE", ""}, false), }, "additional_experiments": { Type: schema.TypeSet, Optional: true, - ForceNew: true, Elem: &schema.Schema{ Type: schema.TypeString, }, @@ -168,6 +165,23 @@ func resourceDataflowJob() *schema.Resource { } } +func resourceDataflowJobTypeCustomizeDiff(d *schema.ResourceDiff, meta interface{}) error { + // All changes are ForceNew for batch jobs + if d.Get("type") == "JOB_TYPE_BATCH" { + resourceSchema := resourceDataflowJob().Schema + for field, fieldSchema := range resourceSchema { + // Each key within a map must be checked for a change + if fieldSchema.Type == schema.TypeMap { + resourceDataflowJobIterateMapForceNew(field, d) + } else if d.HasChange(field) { + d.ForceNew(field) + } + } + } + + return nil +} + func resourceDataflowJobCreate(d *schema.ResourceData, meta interface{}) error { config := meta.(*Config) @@ -176,31 +190,16 @@ func resourceDataflowJobCreate(d *schema.ResourceData, meta interface{}) error { return err } - zone, err := getZone(d, config) - if err != nil { - return err - } - region, err := getRegion(d, config) if err != nil { return err } params := expandStringMap(d, "parameters") - labels := expandStringMap(d, "labels") - additionalExperiments := convertStringSet(d.Get("additional_experiments").(*schema.Set)) - env := dataflow.RuntimeEnvironment{ - MaxWorkers: int64(d.Get("max_workers").(int)), - Network: d.Get("network").(string), - ServiceAccountEmail: d.Get("service_account_email").(string), - Subnetwork: d.Get("subnetwork").(string), - TempLocation: d.Get("temp_gcs_location").(string), - MachineType: d.Get("machine_type").(string), - IpConfiguration: d.Get("ip_configuration").(string), - AdditionalUserLabels: labels, - Zone: zone, - AdditionalExperiments: additionalExperiments, + env, err := resourceDataflowJobSetupEnv(d, config) + if err != nil { + return err } request := dataflow.CreateJobFromTemplateRequest{ @@ -246,6 +245,14 @@ func resourceDataflowJobRead(d *schema.ResourceData, meta interface{}) error { d.Set("project", project) d.Set("labels", job.Labels) + sdkPipelineOptions, err := ConvertToMap(job.Environment.SdkPipelineOptions) + if err != nil { + return err + } + optionsMap := sdkPipelineOptions["options"].(map[string]interface{}) + d.Set("template_gcs_path", optionsMap["templateLocation"]) + d.Set("temp_gcs_location", optionsMap["tempLocation"]) + if _, ok := dataflowTerminalStatesMap[job.CurrentState]; ok { log.Printf("[DEBUG] Removing resource '%s' because it is in state %s.\n", job.Name, job.CurrentState) d.SetId("") @@ -256,6 +263,47 @@ func resourceDataflowJobRead(d *schema.ResourceData, meta interface{}) error { return nil } +// Stream update method. Batch job changes should have been set to ForceNew via custom diff +func resourceDataflowJobUpdateByReplacement(d *schema.ResourceData, meta interface{}) error { + config := meta.(*Config) + + project, err := getProject(d, config) + if err != nil { + return err + } + + region, err := getRegion(d, config) + if err != nil { + return err + } + + params := expandStringMap(d, "parameters") + + env, err := resourceDataflowJobSetupEnv(d, config) + if err != nil { + return err + } + + request := dataflow.LaunchTemplateParameters{ + JobName: d.Get("name").(string), + Parameters: params, + Environment: &env, + Update: true, + } + + var response *dataflow.LaunchTemplateResponse + err = retryTimeDuration(func() (updateErr error) { + response, updateErr = resourceDataflowJobLaunchTemplate(config, project, region, d.Get("template_gcs_path").(string), &request) + return updateErr + }, time.Minute*time.Duration(5), isDataflowJobUpdateRetryableError) + if err != nil { + return err + } + d.SetId(response.Job.Id) + + return resourceDataflowJobRead(d, meta) +} + func resourceDataflowJobDelete(d *schema.ResourceData, meta interface{}) error { config := meta.(*Config) @@ -353,9 +401,9 @@ func resourceDataflowJobCreateJob(config *Config, project string, region string, func resourceDataflowJobGetJob(config *Config, project string, region string, id string) (*dataflow.Job, error) { if region == "" { - return config.clientDataflow.Projects.Jobs.Get(project, id).Do() + return config.clientDataflow.Projects.Jobs.Get(project, id).View("JOB_VIEW_ALL").Do() } - return config.clientDataflow.Projects.Locations.Jobs.Get(project, region, id).Do() + return config.clientDataflow.Projects.Locations.Jobs.Get(project, region, id).View("JOB_VIEW_ALL").Do() } func resourceDataflowJobUpdateJob(config *Config, project string, region string, id string, job *dataflow.Job) (*dataflow.Job, error) { @@ -364,3 +412,47 @@ func resourceDataflowJobUpdateJob(config *Config, project string, region string, } return config.clientDataflow.Projects.Locations.Jobs.Update(project, region, id, job).Do() } + +func resourceDataflowJobLaunchTemplate(config *Config, project string, region string, gcsPath string, request *dataflow.LaunchTemplateParameters) (*dataflow.LaunchTemplateResponse, error) { + if region == "" { + return config.clientDataflow.Projects.Templates.Launch(project, request).GcsPath(gcsPath).Do() + } + return config.clientDataflow.Projects.Locations.Templates.Launch(project, region, request).GcsPath(gcsPath).Do() +} + +func resourceDataflowJobSetupEnv(d *schema.ResourceData, config *Config) (dataflow.RuntimeEnvironment, error) { + zone, err := getZone(d, config) + if err != nil { + return dataflow.RuntimeEnvironment{}, err + } + + labels := expandStringMap(d, "labels") + + additionalExperiments := convertStringSet(d.Get("additional_experiments").(*schema.Set)) + + env := dataflow.RuntimeEnvironment{ + MaxWorkers: int64(d.Get("max_workers").(int)), + Network: d.Get("network").(string), + ServiceAccountEmail: d.Get("service_account_email").(string), + Subnetwork: d.Get("subnetwork").(string), + TempLocation: d.Get("temp_gcs_location").(string), + MachineType: d.Get("machine_type").(string), + IpConfiguration: d.Get("ip_configuration").(string), + AdditionalUserLabels: labels, + Zone: zone, + AdditionalExperiments: additionalExperiments, + } + return env, nil +} + +func resourceDataflowJobIterateMapForceNew(mapKey string, d *schema.ResourceDiff) { + obj := d.Get(mapKey).(map[string]interface{}) + for k := range obj { + entrySchemaKey := mapKey + "." + k + if d.HasChange(entrySchemaKey) { + // ForceNew must be called on the parent map to trigger + d.ForceNew(mapKey) + break + } + } +} diff --git a/google-beta/resource_dataflow_job_test.go b/google-beta/resource_dataflow_job_test.go index f0cd127c45..18282249d3 100644 --- a/google-beta/resource_dataflow_job_test.go +++ b/google-beta/resource_dataflow_job_test.go @@ -15,6 +15,7 @@ import ( const ( testDataflowJobTemplateWordCountUrl = "gs://dataflow-templates/latest/Word_Count" testDataflowJobSampleFileUrl = "gs://dataflow-samples/shakespeare/various.txt" + testDataflowJobTemplateTextToPubsub = "gs://dataflow-templates/latest/Stream_GCS_Text_to_Cloud_PubSub" ) func TestAccDataflowJob_basic(t *testing.T) { @@ -206,6 +207,31 @@ func TestAccDataflowJobWithAdditionalExperiments(t *testing.T) { }) } +func TestAccDataflowJob_streamUpdate(t *testing.T) { + t.Parallel() + + suffix := randString(t, 10) + vcrTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckDataflowJobDestroyProducer(t), + Steps: []resource.TestStep{ + { + Config: testAccDataflowJob_updateStream(suffix, "google_storage_bucket.bucket1.url"), + Check: resource.ComposeTestCheckFunc( + testAccDataflowJobExists(t, "google_dataflow_job.pubsub_stream"), + ), + }, + { + Config: testAccDataflowJob_updateStream(suffix, "google_storage_bucket.bucket2.url"), + Check: resource.ComposeTestCheckFunc( + testAccDataflowJobHasTempLocation(t, "google_dataflow_job.pubsub_stream", "gs://tf-test-bucket2-"+suffix), + ), + }, + }, + }) +} + func testAccCheckDataflowJobDestroyProducer(t *testing.T) func(s *terraform.State) error { return func(s *terraform.State) error { for _, rs := range s.RootModule().Resources { @@ -441,6 +467,36 @@ func testAccDataflowJobHasExperiments(t *testing.T, res string, experiments []st } } +func testAccDataflowJobHasTempLocation(t *testing.T, res, targetLocation string) resource.TestCheckFunc { + return func(s *terraform.State) error { + rs, ok := s.RootModule().Resources[res] + if !ok { + return fmt.Errorf("resource %q not found in state", res) + } + + if rs.Primary.ID == "" { + return fmt.Errorf("No ID is set") + } + config := googleProviderConfig(t) + + job, err := config.clientDataflow.Projects.Jobs.Get(config.Project, rs.Primary.ID).View("JOB_VIEW_ALL").Do() + if err != nil { + return fmt.Errorf("dataflow job does not exist") + } + sdkPipelineOptions, err := ConvertToMap(job.Environment.SdkPipelineOptions) + if err != nil { + return err + } + optionsMap := sdkPipelineOptions["options"].(map[string]interface{}) + + if optionsMap["tempLocation"] != targetLocation { + return fmt.Errorf("Temp locations do not match. Got %s while expecting %s", optionsMap["tempLocation"], targetLocation) + } + + return nil + } +} + func testAccDataflowJob_zone(bucket, job, zone string) string { return fmt.Sprintf(` resource "google_storage_bucket" "temp" { @@ -662,5 +718,30 @@ resource "google_dataflow_job" "with_additional_experiments" { on_delete = "cancel" } `, bucket, job, strings.Join(experiments, `", "`), testDataflowJobTemplateWordCountUrl, testDataflowJobSampleFileUrl) +} +func testAccDataflowJob_updateStream(suffix, tempLocation string) string { + return fmt.Sprintf(` +resource "google_pubsub_topic" "topic" { + name = "tf-test-dataflow-job-%s" +} +resource "google_storage_bucket" "bucket1" { + name = "tf-test-bucket1-%s" + force_destroy = true +} +resource "google_storage_bucket" "bucket2" { + name = "tf-test-bucket2-%s" + force_destroy = true +} +resource "google_dataflow_job" "pubsub_stream" { + name = "tf-test-dataflow-job-%s" + template_gcs_path = "%s" + temp_gcs_location = %s + parameters = { + inputFilePattern = "${google_storage_bucket.bucket1.url}/*.json" + outputTopic = google_pubsub_topic.topic.id + } + on_delete = "cancel" +} + `, suffix, suffix, suffix, suffix, testDataflowJobTemplateTextToPubsub, tempLocation) }