Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dataflow job update-by-replacement #3387

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 124 additions & 32 deletions third_party/terraform/resources/resource_dataflow_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strings"
"time"

"github.com/hashicorp/terraform-plugin-sdk/helper/customdiff"
"github.com/hashicorp/terraform-plugin-sdk/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/helper/validation"

Expand Down Expand Up @@ -44,54 +45,56 @@ func resourceDataflowJob() *schema.Resource {
return &schema.Resource{
Create: resourceDataflowJobCreate,
Read: resourceDataflowJobRead,
Update: resourceDataflowJobUpdateByReplacement,
Delete: resourceDataflowJobDelete,
CustomizeDiff: customdiff.All(
resourceDataflowJobTypeCustomizeDiff,
),
Schema: map[string]*schema.Schema{
"name": {
Type: schema.TypeString,
Required: true,
// ForceNew applies to both stream and batch jobs
ForceNew: true,
},

"template_gcs_path": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
},

"temp_gcs_location": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
},

"zone": {
Type: schema.TypeString,
Optional: true,
// ForceNew applies to both stream and batch jobs
ForceNew: true,
},

"region": {
Type: schema.TypeString,
Optional: true,
// ForceNew applies to both stream and batch jobs
ForceNew: true,
},

"max_workers": {
Type: schema.TypeInt,
Optional: true,
ForceNew: true,
},

"parameters": {
Type: schema.TypeMap,
Optional: true,
ForceNew: true,
},

"labels": {
Type: schema.TypeMap,
Optional: true,
ForceNew: true,
DiffSuppressFunc: resourceDataflowJobLabelDiffSuppress,
},

Expand All @@ -100,13 +103,13 @@ func resourceDataflowJob() *schema.Resource {
ValidateFunc: validation.StringInSlice([]string{"cancel", "drain"}, false),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @c2thorn, does this mean that changes to on_delete would also trigger an update-by-replacement?

Copy link
Member Author

@c2thorn c2thorn May 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, yes that is true. Previously any changes to on_delete would destroy/recreate both stream or batch jobs. Now it will trigger the update-by-replacement for streaming jobs, while retaining the same destroy/recreate functionality for batch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh that's interesting that we used to ForceNew on that! It probably should be something that can be updatable completely on its own, without triggering a ForceNew or an update-by-replacement.

Copy link
Member

@jcanseco jcanseco May 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah I was surprised to see that it used to be ForceNew as well.

I agree that this field should be updatable on its own without triggering a ForceNew or update-by-replacement. Would you like me to file this as a separate issue?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Optional: true,
Default: "drain",
ForceNew: true,
},

"project": {
Type: schema.TypeString,
Optional: true,
Computed: true,
// ForceNew applies to both stream and batch jobs
ForceNew: true,
},

Expand All @@ -121,40 +124,34 @@ func resourceDataflowJob() *schema.Resource {
"service_account_email": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
},

"network": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
DiffSuppressFunc: compareSelfLinkOrResourceName,
},

"subnetwork": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
DiffSuppressFunc: compareSelfLinkOrResourceName,
},

"machine_type": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
},

"ip_configuration": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
ValidateFunc: validation.StringInSlice([]string{"WORKER_IP_PUBLIC", "WORKER_IP_PRIVATE", ""}, false),
},

"additional_experiments": {
Type: schema.TypeSet,
Optional: true,
ForceNew: true,
Elem: &schema.Schema{
Type: schema.TypeString,
},
Expand All @@ -168,6 +165,23 @@ func resourceDataflowJob() *schema.Resource {
}
}

func resourceDataflowJobTypeCustomizeDiff(d *schema.ResourceDiff, meta interface{}) error {
// All changes are ForceNew for batch jobs
if d.Get("type") == "JOB_TYPE_BATCH" {
resourceSchema := resourceDataflowJob().Schema
c2thorn marked this conversation as resolved.
Show resolved Hide resolved
for field, fieldSchema := range resourceSchema {
// Each key within a map must be checked for a change
if fieldSchema.Type == schema.TypeMap {
resourceDataflowJobIterateMapForceNew(field, d)
} else if d.HasChange(field) {
d.ForceNew(field)
}
}
}

return nil
}

func resourceDataflowJobCreate(d *schema.ResourceData, meta interface{}) error {
config := meta.(*Config)

Expand All @@ -176,31 +190,16 @@ func resourceDataflowJobCreate(d *schema.ResourceData, meta interface{}) error {
return err
}

zone, err := getZone(d, config)
if err != nil {
return err
}

region, err := getRegion(d, config)
if err != nil {
return err
}

params := expandStringMap(d, "parameters")
labels := expandStringMap(d, "labels")
additionalExperiments := convertStringSet(d.Get("additional_experiments").(*schema.Set))

env := dataflow.RuntimeEnvironment{
MaxWorkers: int64(d.Get("max_workers").(int)),
Network: d.Get("network").(string),
ServiceAccountEmail: d.Get("service_account_email").(string),
Subnetwork: d.Get("subnetwork").(string),
TempLocation: d.Get("temp_gcs_location").(string),
MachineType: d.Get("machine_type").(string),
IpConfiguration: d.Get("ip_configuration").(string),
AdditionalUserLabels: labels,
Zone: zone,
AdditionalExperiments: additionalExperiments,
env, err := resourceDataflowJobSetupEnv(d, config)
if err != nil {
return err
}

request := dataflow.CreateJobFromTemplateRequest{
Expand Down Expand Up @@ -246,6 +245,14 @@ func resourceDataflowJobRead(d *schema.ResourceData, meta interface{}) error {
d.Set("project", project)
d.Set("labels", job.Labels)

sdkPipelineOptions, err := ConvertToMap(job.Environment.SdkPipelineOptions)
if err != nil {
return err
}
optionsMap := sdkPipelineOptions["options"].(map[string]interface{})
d.Set("template_gcs_path", optionsMap["templateLocation"])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add to the CHANGELOG that we now detect drift on these fields?

d.Set("temp_gcs_location", optionsMap["tempLocation"])

if _, ok := dataflowTerminalStatesMap[job.CurrentState]; ok {
log.Printf("[DEBUG] Removing resource '%s' because it is in state %s.\n", job.Name, job.CurrentState)
d.SetId("")
Expand All @@ -256,6 +263,47 @@ func resourceDataflowJobRead(d *schema.ResourceData, meta interface{}) error {
return nil
}

// Stream update method. Batch job changes should have been set to ForceNew via custom diff
func resourceDataflowJobUpdateByReplacement(d *schema.ResourceData, meta interface{}) error {
config := meta.(*Config)

project, err := getProject(d, config)
if err != nil {
return err
}

region, err := getRegion(d, config)
if err != nil {
return err
}

params := expandStringMap(d, "parameters")

env, err := resourceDataflowJobSetupEnv(d, config)
if err != nil {
return err
}

request := dataflow.LaunchTemplateParameters{
JobName: d.Get("name").(string),
Parameters: params,
Environment: &env,
Update: true,
}

var response *dataflow.LaunchTemplateResponse
err = retryTimeDuration(func() (updateErr error) {
response, updateErr = resourceDataflowJobLaunchTemplate(config, project, region, d.Get("template_gcs_path").(string), &request)
return updateErr
}, time.Minute*time.Duration(5), isDataflowJobUpdateRetryableError)
if err != nil {
return err
}
d.SetId(response.Job.Id)

return resourceDataflowJobRead(d, meta)
}

func resourceDataflowJobDelete(d *schema.ResourceData, meta interface{}) error {
config := meta.(*Config)

Expand Down Expand Up @@ -353,9 +401,9 @@ func resourceDataflowJobCreateJob(config *Config, project string, region string,

func resourceDataflowJobGetJob(config *Config, project string, region string, id string) (*dataflow.Job, error) {
if region == "" {
return config.clientDataflow.Projects.Jobs.Get(project, id).Do()
return config.clientDataflow.Projects.Jobs.Get(project, id).View("JOB_VIEW_ALL").Do()
}
return config.clientDataflow.Projects.Locations.Jobs.Get(project, region, id).Do()
return config.clientDataflow.Projects.Locations.Jobs.Get(project, region, id).View("JOB_VIEW_ALL").Do()
}

func resourceDataflowJobUpdateJob(config *Config, project string, region string, id string, job *dataflow.Job) (*dataflow.Job, error) {
Expand All @@ -364,3 +412,47 @@ func resourceDataflowJobUpdateJob(config *Config, project string, region string,
}
return config.clientDataflow.Projects.Locations.Jobs.Update(project, region, id, job).Do()
}

func resourceDataflowJobLaunchTemplate(config *Config, project string, region string, gcsPath string, request *dataflow.LaunchTemplateParameters) (*dataflow.LaunchTemplateResponse, error) {
if region == "" {
return config.clientDataflow.Projects.Templates.Launch(project, request).GcsPath(gcsPath).Do()
}
return config.clientDataflow.Projects.Locations.Templates.Launch(project, region, request).GcsPath(gcsPath).Do()
}

func resourceDataflowJobSetupEnv(d *schema.ResourceData, config *Config) (dataflow.RuntimeEnvironment, error) {
zone, err := getZone(d, config)
if err != nil {
return dataflow.RuntimeEnvironment{}, err
}

labels := expandStringMap(d, "labels")

additionalExperiments := convertStringSet(d.Get("additional_experiments").(*schema.Set))

env := dataflow.RuntimeEnvironment{
MaxWorkers: int64(d.Get("max_workers").(int)),
Network: d.Get("network").(string),
ServiceAccountEmail: d.Get("service_account_email").(string),
Subnetwork: d.Get("subnetwork").(string),
TempLocation: d.Get("temp_gcs_location").(string),
MachineType: d.Get("machine_type").(string),
IpConfiguration: d.Get("ip_configuration").(string),
AdditionalUserLabels: labels,
Zone: zone,
AdditionalExperiments: additionalExperiments,
}
return env, nil
}

func resourceDataflowJobIterateMapForceNew(mapKey string, d *schema.ResourceDiff) {
obj := d.Get(mapKey).(map[string]interface{})
for k := range obj {
entrySchemaKey := mapKey + "." + k
if d.HasChange(entrySchemaKey) {
// ForceNew must be called on the parent map to trigger
d.ForceNew(mapKey)
break
}
}
}
Loading