Skip to content

Commit

Permalink
Dataflow update-by-replacement
Browse files Browse the repository at this point in the history
  • Loading branch information
c2thorn committed Apr 29, 2020
1 parent 7e909bc commit b479f2d
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 13 deletions.
118 changes: 105 additions & 13 deletions third_party/terraform/resources/resource_dataflow_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strings"
"time"

"github.com/hashicorp/terraform-plugin-sdk/helper/customdiff"
"github.com/hashicorp/terraform-plugin-sdk/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/helper/validation"

Expand Down Expand Up @@ -44,54 +45,56 @@ func resourceDataflowJob() *schema.Resource {
return &schema.Resource{
Create: resourceDataflowJobCreate,
Read: resourceDataflowJobRead,
Update: resourceDataflowJobUpdateByReplacement,
Delete: resourceDataflowJobDelete,
CustomizeDiff: customdiff.All(
resourceDataflowJobTypeCustomizeDiff,
),
Schema: map[string]*schema.Schema{
"name": {
Type: schema.TypeString,
Required: true,
// ForceNew applies to both stream and batch jobs
ForceNew: true,
},

"template_gcs_path": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
},

"temp_gcs_location": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
},

"zone": {
Type: schema.TypeString,
Optional: true,
// ForceNew applies to both stream and batch jobs
ForceNew: true,
},

"region": {
Type: schema.TypeString,
Optional: true,
// ForceNew applies to both stream and batch jobs
ForceNew: true,
},

"max_workers": {
Type: schema.TypeInt,
Optional: true,
ForceNew: true,
},

"parameters": {
Type: schema.TypeMap,
Optional: true,
ForceNew: true,
},

"labels": {
Type: schema.TypeMap,
Optional: true,
ForceNew: true,
DiffSuppressFunc: resourceDataflowJobLabelDiffSuppress,
},

Expand All @@ -100,13 +103,13 @@ func resourceDataflowJob() *schema.Resource {
ValidateFunc: validation.StringInSlice([]string{"cancel", "drain"}, false),
Optional: true,
Default: "drain",
ForceNew: true,
},

"project": {
Type: schema.TypeString,
Optional: true,
Computed: true,
// ForceNew applies to both stream and batch jobs
ForceNew: true,
},

Expand All @@ -121,33 +124,28 @@ func resourceDataflowJob() *schema.Resource {
"service_account_email": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
},

"network": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
DiffSuppressFunc: compareSelfLinkOrResourceName,
},

"subnetwork": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
DiffSuppressFunc: compareSelfLinkOrResourceName,
},

"machine_type": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
},

"ip_configuration": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
ValidateFunc: validation.StringInSlice([]string{"WORKER_IP_PUBLIC", "WORKER_IP_PRIVATE", ""}, false),
},

Expand All @@ -168,6 +166,22 @@ func resourceDataflowJob() *schema.Resource {
}
}

func resourceDataflowJobTypeCustomizeDiff(d *schema.ResourceDiff, meta interface{}) error {
// All changes are ForceNew for batch jobs
if d.Get("type") == "JOB_TYPE_BATCH" {
resourceSchema := resourceDataflowJob().Schema
for field, fieldSchema := range resourceSchema {
if fieldSchema.Type == schema.TypeMap {
resourceDataflowJobIterateMapForceNew(field, d)
} else if d.HasChange(field) {
d.ForceNew(field)
}
}
}

return nil
}

func resourceDataflowJobCreate(d *schema.ResourceData, meta interface{}) error {
config := meta.(*Config)

Expand Down Expand Up @@ -246,6 +260,11 @@ func resourceDataflowJobRead(d *schema.ResourceData, meta interface{}) error {
d.Set("project", project)
d.Set("labels", job.Labels)

sdkPipelineOptions, err := ConvertToMap(job.Environment.SdkPipelineOptions)
optionsMap := sdkPipelineOptions["options"].(map[string]interface{})
d.Set("template_gcs_path", optionsMap["templateLocation"])
d.Set("temp_gcs_location", optionsMap["tempLocation"])

if _, ok := dataflowTerminalStatesMap[job.CurrentState]; ok {
log.Printf("[DEBUG] Removing resource '%s' because it is in state %s.\n", job.Name, job.CurrentState)
d.SetId("")
Expand All @@ -256,6 +275,60 @@ func resourceDataflowJobRead(d *schema.ResourceData, meta interface{}) error {
return nil
}

// Stream update method. Batch job changes should have been set to ForceNew via custom diff
func resourceDataflowJobUpdateByReplacement(d *schema.ResourceData, meta interface{}) error {
config := meta.(*Config)

project, err := getProject(d, config)
if err != nil {
return err
}

region, err := getRegion(d, config)
if err != nil {
return err
}

zone, err := getZone(d, config)
if err != nil {
return err
}

params := expandStringMap(d, "parameters")
labels := expandStringMap(d, "labels")

env := dataflow.RuntimeEnvironment{
MaxWorkers: int64(d.Get("max_workers").(int)),
Network: d.Get("network").(string),
ServiceAccountEmail: d.Get("service_account_email").(string),
Subnetwork: d.Get("subnetwork").(string),
TempLocation: d.Get("temp_gcs_location").(string),
MachineType: d.Get("machine_type").(string),
IpConfiguration: d.Get("ip_configuration").(string),
AdditionalUserLabels: labels,
Zone: zone,
}

request := dataflow.LaunchTemplateParameters{
JobName: d.Get("name").(string),
Parameters: params,
Environment: &env,
Update: true,
}

var response *dataflow.LaunchTemplateResponse
err = retryTimeDuration(func() (updateErr error) {
response, updateErr = resourceDataflowJobLaunchTemplate(config, project, region, d.Get("template_gcs_path").(string), &request)
return updateErr
}, time.Minute*time.Duration(5), isDataflowJobUpdateRetryableError)
if err != nil {
return err
}
d.SetId(response.Job.Id)

return resourceDataflowJobRead(d, meta)
}

func resourceDataflowJobDelete(d *schema.ResourceData, meta interface{}) error {
config := meta.(*Config)

Expand Down Expand Up @@ -353,9 +426,9 @@ func resourceDataflowJobCreateJob(config *Config, project string, region string,

func resourceDataflowJobGetJob(config *Config, project string, region string, id string) (*dataflow.Job, error) {
if region == "" {
return config.clientDataflow.Projects.Jobs.Get(project, id).Do()
return config.clientDataflow.Projects.Jobs.Get(project, id).View("JOB_VIEW_ALL").Do()
}
return config.clientDataflow.Projects.Locations.Jobs.Get(project, region, id).Do()
return config.clientDataflow.Projects.Locations.Jobs.Get(project, region, id).View("JOB_VIEW_ALL").Do()
}

func resourceDataflowJobUpdateJob(config *Config, project string, region string, id string, job *dataflow.Job) (*dataflow.Job, error) {
Expand All @@ -364,3 +437,22 @@ func resourceDataflowJobUpdateJob(config *Config, project string, region string,
}
return config.clientDataflow.Projects.Locations.Jobs.Update(project, region, id, job).Do()
}

func resourceDataflowJobLaunchTemplate(config *Config, project string, region string, gcsPath string, request *dataflow.LaunchTemplateParameters) (*dataflow.LaunchTemplateResponse, error) {
if region == "" {
return config.clientDataflow.Projects.Templates.Launch(project, request).GcsPath(gcsPath).Do()
}
return config.clientDataflow.Projects.Locations.Templates.Launch(project, region, request).GcsPath(gcsPath).Do()
}

func resourceDataflowJobIterateMapForceNew(mapKey string, d *schema.ResourceDiff) {
obj := d.Get(mapKey).(map[string]interface{})
for k := range obj {
entrySchemaKey := mapKey + "." + k
if d.HasChange(entrySchemaKey) {
// ForceNew must be called on the parent map to trigger
d.ForceNew(mapKey)
break
}
}
}
78 changes: 78 additions & 0 deletions third_party/terraform/tests/resource_dataflow_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
const (
testDataflowJobTemplateWordCountUrl = "gs://dataflow-templates/latest/Word_Count"
testDataflowJobSampleFileUrl = "gs://dataflow-samples/shakespeare/various.txt"
testDataflowJobTemplateTextToPubsub = "gs://dataflow-templates/latest/Stream_GCS_Text_to_Cloud_PubSub"
)

func TestAccDataflowJob_basic(t *testing.T) {
Expand Down Expand Up @@ -206,6 +207,31 @@ func TestAccDataflowJobWithAdditionalExperiments(t *testing.T) {
})
}

func TestAccDataflowJob_streamUpdate(t *testing.T) {
t.Parallel()

suffix := randString(t, 10)
vcrTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckDataflowJobDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccDataflowJob_updateStream(suffix, "google_storage_bucket.bucket1.url"),
Check: resource.ComposeTestCheckFunc(
testAccDataflowJobExists(t, "google_dataflow_job.pubsub_stream"),
),
},
{
Config: testAccDataflowJob_updateStream(suffix, "google_storage_bucket.bucket2.url"),
Check: resource.ComposeTestCheckFunc(
testAccDataflowJobHasUpdated(t, "google_dataflow_job.pubsub_stream", "gs://tf-test-bucket2-"+suffix),
),
},
},
})
}

func testAccCheckDataflowJobDestroyProducer(t *testing.T) func(s *terraform.State) error {
return func(s *terraform.State) error {
for _, rs := range s.RootModule().Resources {
Expand Down Expand Up @@ -441,6 +467,33 @@ func testAccDataflowJobHasExperiments(t *testing.T, res string, experiments []st
}
}

func testAccDataflowJobHasUpdated(t *testing.T, res, targetLocation string) resource.TestCheckFunc {
return func(s *terraform.State) error {
rs, ok := s.RootModule().Resources[res]
if !ok {
return fmt.Errorf("resource %q not found in state", res)
}

if rs.Primary.ID == "" {
return fmt.Errorf("No ID is set")
}
config := googleProviderConfig(t)

job, err := config.clientDataflow.Projects.Jobs.Get(config.Project, rs.Primary.ID).View("JOB_VIEW_ALL").Do()
if err != nil {
return fmt.Errorf("dataflow job does not exist")
}
sdkPipelineOptions, err := ConvertToMap(job.Environment.SdkPipelineOptions)
optionsMap := sdkPipelineOptions["options"].(map[string]interface{})

if optionsMap["tempLocation"] != targetLocation {
return fmt.Errorf("Temp locations do not match. Got %s while expecting %s", optionsMap["tempLocation"], targetLocation)
}

return nil
}
}

func testAccDataflowJob_zone(bucket, job, zone string) string {
return fmt.Sprintf(`
resource "google_storage_bucket" "temp" {
Expand Down Expand Up @@ -662,5 +715,30 @@ resource "google_dataflow_job" "with_additional_experiments" {
on_delete = "cancel"
}
`, bucket, job, strings.Join(experiments, `", "`), testDataflowJobTemplateWordCountUrl, testDataflowJobSampleFileUrl)
}

func testAccDataflowJob_updateStream(suffix, tempLocation string) string {
return fmt.Sprintf(`
resource "google_pubsub_topic" "topic" {
name = "tf-test-dataflow-job-%s"
}
resource "google_storage_bucket" "bucket1" {
name = "tf-test-bucket1-%s"
force_destroy = true
}
resource "google_storage_bucket" "bucket2" {
name = "tf-test-bucket2-%s"
force_destroy = true
}
resource "google_dataflow_job" "pubsub_stream" {
name = "tf-test-dataflow-job-%s"
template_gcs_path = "%s"
temp_gcs_location = %s
parameters = {
inputFilePattern = "${google_storage_bucket.bucket1.url}/*.json"
outputTopic = google_pubsub_topic.topic.id
}
on_delete = "cancel"
}
`, suffix, suffix, suffix, suffix, testDataflowJobTemplateTextToPubsub, tempLocation)
}
9 changes: 9 additions & 0 deletions third_party/terraform/utils/error_retry_predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,12 @@ func isStoragePreconditionError(err error) (bool, string) {
}
return false, ""
}

func isDataflowJobUpdateRetryableError(err error) (bool, string) {
if gerr, ok := err.(*googleapi.Error); ok {
if gerr.Code == 404 && strings.Contains(gerr.Body, "in RUNNING OR DRAINING state") {
return true, "Waiting for job to be in a valid state"
}
}
return false, ""
}

0 comments on commit b479f2d

Please sign in to comment.