Skip to content

Commit

Permalink
dataflow job update-by-replacement (#3387) (#2021)
Browse files Browse the repository at this point in the history
Signed-off-by: Modular Magician <[email protected]>
  • Loading branch information
modular-magician authored Apr 30, 2020
1 parent cb0ab99 commit a9ca4c6
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 32 deletions.
6 changes: 6 additions & 0 deletions .changelog/3387.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
```release-note:enhancement
dataflow: Added support for update-by-replacement to `google_dataflow_job`
```
```release-note:enhancement
dataflow: Added drift detection for `google_dataflow_job` `template_gcs_path` and `temp_gcs_location` fields
```
9 changes: 9 additions & 0 deletions google-beta/error_retry_predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,12 @@ func isStoragePreconditionError(err error) (bool, string) {
}
return false, ""
}

func isDataflowJobUpdateRetryableError(err error) (bool, string) {
if gerr, ok := err.(*googleapi.Error); ok {
if gerr.Code == 404 && strings.Contains(gerr.Body, "in RUNNING OR DRAINING state") {
return true, "Waiting for job to be in a valid state"
}
}
return false, ""
}
156 changes: 124 additions & 32 deletions google-beta/resource_dataflow_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strings"
"time"

"github.com/hashicorp/terraform-plugin-sdk/helper/customdiff"
"github.com/hashicorp/terraform-plugin-sdk/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/helper/validation"

Expand Down Expand Up @@ -44,54 +45,56 @@ func resourceDataflowJob() *schema.Resource {
return &schema.Resource{
Create: resourceDataflowJobCreate,
Read: resourceDataflowJobRead,
Update: resourceDataflowJobUpdateByReplacement,
Delete: resourceDataflowJobDelete,
CustomizeDiff: customdiff.All(
resourceDataflowJobTypeCustomizeDiff,
),
Schema: map[string]*schema.Schema{
"name": {
Type: schema.TypeString,
Required: true,
// ForceNew applies to both stream and batch jobs
ForceNew: true,
},

"template_gcs_path": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
},

"temp_gcs_location": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
},

"zone": {
Type: schema.TypeString,
Optional: true,
// ForceNew applies to both stream and batch jobs
ForceNew: true,
},

"region": {
Type: schema.TypeString,
Optional: true,
// ForceNew applies to both stream and batch jobs
ForceNew: true,
},

"max_workers": {
Type: schema.TypeInt,
Optional: true,
ForceNew: true,
},

"parameters": {
Type: schema.TypeMap,
Optional: true,
ForceNew: true,
},

"labels": {
Type: schema.TypeMap,
Optional: true,
ForceNew: true,
DiffSuppressFunc: resourceDataflowJobLabelDiffSuppress,
},

Expand All @@ -100,13 +103,13 @@ func resourceDataflowJob() *schema.Resource {
ValidateFunc: validation.StringInSlice([]string{"cancel", "drain"}, false),
Optional: true,
Default: "drain",
ForceNew: true,
},

"project": {
Type: schema.TypeString,
Optional: true,
Computed: true,
// ForceNew applies to both stream and batch jobs
ForceNew: true,
},

Expand All @@ -121,40 +124,34 @@ func resourceDataflowJob() *schema.Resource {
"service_account_email": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
},

"network": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
DiffSuppressFunc: compareSelfLinkOrResourceName,
},

"subnetwork": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
DiffSuppressFunc: compareSelfLinkOrResourceName,
},

"machine_type": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
},

"ip_configuration": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
ValidateFunc: validation.StringInSlice([]string{"WORKER_IP_PUBLIC", "WORKER_IP_PRIVATE", ""}, false),
},

"additional_experiments": {
Type: schema.TypeSet,
Optional: true,
ForceNew: true,
Elem: &schema.Schema{
Type: schema.TypeString,
},
Expand All @@ -168,6 +165,23 @@ func resourceDataflowJob() *schema.Resource {
}
}

func resourceDataflowJobTypeCustomizeDiff(d *schema.ResourceDiff, meta interface{}) error {
// All changes are ForceNew for batch jobs
if d.Get("type") == "JOB_TYPE_BATCH" {
resourceSchema := resourceDataflowJob().Schema
for field, fieldSchema := range resourceSchema {
// Each key within a map must be checked for a change
if fieldSchema.Type == schema.TypeMap {
resourceDataflowJobIterateMapForceNew(field, d)
} else if d.HasChange(field) {
d.ForceNew(field)
}
}
}

return nil
}

func resourceDataflowJobCreate(d *schema.ResourceData, meta interface{}) error {
config := meta.(*Config)

Expand All @@ -176,31 +190,16 @@ func resourceDataflowJobCreate(d *schema.ResourceData, meta interface{}) error {
return err
}

zone, err := getZone(d, config)
if err != nil {
return err
}

region, err := getRegion(d, config)
if err != nil {
return err
}

params := expandStringMap(d, "parameters")
labels := expandStringMap(d, "labels")
additionalExperiments := convertStringSet(d.Get("additional_experiments").(*schema.Set))

env := dataflow.RuntimeEnvironment{
MaxWorkers: int64(d.Get("max_workers").(int)),
Network: d.Get("network").(string),
ServiceAccountEmail: d.Get("service_account_email").(string),
Subnetwork: d.Get("subnetwork").(string),
TempLocation: d.Get("temp_gcs_location").(string),
MachineType: d.Get("machine_type").(string),
IpConfiguration: d.Get("ip_configuration").(string),
AdditionalUserLabels: labels,
Zone: zone,
AdditionalExperiments: additionalExperiments,
env, err := resourceDataflowJobSetupEnv(d, config)
if err != nil {
return err
}

request := dataflow.CreateJobFromTemplateRequest{
Expand Down Expand Up @@ -246,6 +245,14 @@ func resourceDataflowJobRead(d *schema.ResourceData, meta interface{}) error {
d.Set("project", project)
d.Set("labels", job.Labels)

sdkPipelineOptions, err := ConvertToMap(job.Environment.SdkPipelineOptions)
if err != nil {
return err
}
optionsMap := sdkPipelineOptions["options"].(map[string]interface{})
d.Set("template_gcs_path", optionsMap["templateLocation"])
d.Set("temp_gcs_location", optionsMap["tempLocation"])

if _, ok := dataflowTerminalStatesMap[job.CurrentState]; ok {
log.Printf("[DEBUG] Removing resource '%s' because it is in state %s.\n", job.Name, job.CurrentState)
d.SetId("")
Expand All @@ -256,6 +263,47 @@ func resourceDataflowJobRead(d *schema.ResourceData, meta interface{}) error {
return nil
}

// Stream update method. Batch job changes should have been set to ForceNew via custom diff
func resourceDataflowJobUpdateByReplacement(d *schema.ResourceData, meta interface{}) error {
config := meta.(*Config)

project, err := getProject(d, config)
if err != nil {
return err
}

region, err := getRegion(d, config)
if err != nil {
return err
}

params := expandStringMap(d, "parameters")

env, err := resourceDataflowJobSetupEnv(d, config)
if err != nil {
return err
}

request := dataflow.LaunchTemplateParameters{
JobName: d.Get("name").(string),
Parameters: params,
Environment: &env,
Update: true,
}

var response *dataflow.LaunchTemplateResponse
err = retryTimeDuration(func() (updateErr error) {
response, updateErr = resourceDataflowJobLaunchTemplate(config, project, region, d.Get("template_gcs_path").(string), &request)
return updateErr
}, time.Minute*time.Duration(5), isDataflowJobUpdateRetryableError)
if err != nil {
return err
}
d.SetId(response.Job.Id)

return resourceDataflowJobRead(d, meta)
}

func resourceDataflowJobDelete(d *schema.ResourceData, meta interface{}) error {
config := meta.(*Config)

Expand Down Expand Up @@ -353,9 +401,9 @@ func resourceDataflowJobCreateJob(config *Config, project string, region string,

func resourceDataflowJobGetJob(config *Config, project string, region string, id string) (*dataflow.Job, error) {
if region == "" {
return config.clientDataflow.Projects.Jobs.Get(project, id).Do()
return config.clientDataflow.Projects.Jobs.Get(project, id).View("JOB_VIEW_ALL").Do()
}
return config.clientDataflow.Projects.Locations.Jobs.Get(project, region, id).Do()
return config.clientDataflow.Projects.Locations.Jobs.Get(project, region, id).View("JOB_VIEW_ALL").Do()
}

func resourceDataflowJobUpdateJob(config *Config, project string, region string, id string, job *dataflow.Job) (*dataflow.Job, error) {
Expand All @@ -364,3 +412,47 @@ func resourceDataflowJobUpdateJob(config *Config, project string, region string,
}
return config.clientDataflow.Projects.Locations.Jobs.Update(project, region, id, job).Do()
}

func resourceDataflowJobLaunchTemplate(config *Config, project string, region string, gcsPath string, request *dataflow.LaunchTemplateParameters) (*dataflow.LaunchTemplateResponse, error) {
if region == "" {
return config.clientDataflow.Projects.Templates.Launch(project, request).GcsPath(gcsPath).Do()
}
return config.clientDataflow.Projects.Locations.Templates.Launch(project, region, request).GcsPath(gcsPath).Do()
}

func resourceDataflowJobSetupEnv(d *schema.ResourceData, config *Config) (dataflow.RuntimeEnvironment, error) {
zone, err := getZone(d, config)
if err != nil {
return dataflow.RuntimeEnvironment{}, err
}

labels := expandStringMap(d, "labels")

additionalExperiments := convertStringSet(d.Get("additional_experiments").(*schema.Set))

env := dataflow.RuntimeEnvironment{
MaxWorkers: int64(d.Get("max_workers").(int)),
Network: d.Get("network").(string),
ServiceAccountEmail: d.Get("service_account_email").(string),
Subnetwork: d.Get("subnetwork").(string),
TempLocation: d.Get("temp_gcs_location").(string),
MachineType: d.Get("machine_type").(string),
IpConfiguration: d.Get("ip_configuration").(string),
AdditionalUserLabels: labels,
Zone: zone,
AdditionalExperiments: additionalExperiments,
}
return env, nil
}

func resourceDataflowJobIterateMapForceNew(mapKey string, d *schema.ResourceDiff) {
obj := d.Get(mapKey).(map[string]interface{})
for k := range obj {
entrySchemaKey := mapKey + "." + k
if d.HasChange(entrySchemaKey) {
// ForceNew must be called on the parent map to trigger
d.ForceNew(mapKey)
break
}
}
}
Loading

0 comments on commit a9ca4c6

Please sign in to comment.