Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

google_dataflow_job - when updating, wait for new job to start #3591

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions third_party/terraform/resources/resource_dataflow_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ func resourceDataflowJob() *schema.Resource {
Read: resourceDataflowJobRead,
Update: resourceDataflowJobUpdateByReplacement,
Delete: resourceDataflowJobDelete,
Timeouts: &schema.ResourceTimeout{
Update: schema.DefaultTimeout(10 * time.Minute),
},
CustomizeDiff: customdiff.All(
resourceDataflowJobTypeCustomizeDiff,
),
Expand Down Expand Up @@ -309,6 +312,11 @@ func resourceDataflowJobUpdateByReplacement(d *schema.ResourceData, meta interfa
if err != nil {
return err
}

if err := waitForDataflowJobToBeUpdated(d, config, response.Job.Id, d.Timeout(schema.TimeoutUpdate)); err != nil {
return fmt.Errorf("Error updating job with job ID %q: %v", d.Id(), err)
}

d.SetId(response.Job.Id)

return resourceDataflowJobRead(d, meta)
Expand Down Expand Up @@ -499,3 +507,33 @@ func resourceDataflowJobIsVirtualUpdate(d *schema.ResourceData) bool {

return false
}

func waitForDataflowJobToBeUpdated(d *schema.ResourceData, config *Config, replacementJobID string, timeout time.Duration) error {
return resource.Retry(timeout, func() *resource.RetryError {
project, err := getProject(d, config)
if err != nil {
return resource.NonRetryableError(err)
}

region, err := getRegion(d, config)
if err != nil {
return resource.NonRetryableError(err)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@emilymye @c2thorn,

@spew brought up the following about this line:

This seems like a place where we would want to retry and thus not return NonRetryableError?
Example: transient errors such as service 500s, TLS handshakes, etc, I believe will be returned by resourceDataflowJobGetJob(...) as that function is simply using the GCP APIs directly.

Thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds plausible, but haven't seen any such errors in practice. It doesn't hurt to retry if we know specifically which errors we want to retry for

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has occurred for many resources for us in the past. Most of it was fixed by running things through the retry functions in retry_utils.go using the defaultRetryPredicates in error_retry_predicates.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggest using the default retry predicates

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I'll put out a patch for Magic Modules, then if it looks good to the Terraform team, I can bring that patch to KCC's copy of Terraform to ensure we're in sync.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good @jcanseco!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also apologies, I just realized I commented on the "NonRetryableError" for getRegion(). I meant to do so for the one for resourceDataflowGetJob(). Might've been obvious but I thought I should clarify it.

}

replacementJob, err := resourceDataflowJobGetJob(config, project, region, replacementJobID)
if err != nil {
return resource.NonRetryableError(err)
}

state := replacementJob.CurrentState
switch state {
case "", "JOB_STATE_PENDING":
return resource.RetryableError(fmt.Errorf("the replacement job with ID %q has pending state %q.", replacementJobID, state))
case "JOB_STATE_FAILED":
return resource.NonRetryableError(fmt.Errorf("the replacement job with ID %q failed with state %q.", replacementJobID, state))
default:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
default:
case "":
return resource.RetryableError(fmt.Errorf("the replacement job with ID %q does not have a defined state. Retrying.", replacementJobID, state))
default:

Found a case where the state just returns empty before eventually getting to JOB_STATE_FAILED. Adding a retry here gets to the failed state

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can also combine this with "JOB_STATE_PENDING" and change the message to "has pending state %q"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch! Fixed.

log.Printf("[DEBUG] the replacement job with ID %q has state %q.", replacementJobID, state)
return nil
}
})
}