From 91bca3a4fa37a3d15050e750d9a84de00ed05ea5 Mon Sep 17 00:00:00 2001 From: The Magician Date: Thu, 25 Apr 2019 08:51:18 -0700 Subject: [PATCH] Add (sub)network to dataflow job (#3476) Signed-off-by: Modular Magician --- google/resource_dataflow_job.go | 20 ++- google/resource_dataflow_job_test.go | 184 ++++++++++++++++++++++ website/docs/r/dataflow_job.html.markdown | 3 + 3 files changed, 205 insertions(+), 2 deletions(-) diff --git a/google/resource_dataflow_job.go b/google/resource_dataflow_job.go index c7b2639d8fa..ce9c15e93f8 100644 --- a/google/resource_dataflow_job.go +++ b/google/resource_dataflow_job.go @@ -96,6 +96,20 @@ func resourceDataflowJob() *schema.Resource { Optional: true, ForceNew: true, }, + + "network": { + Type: schema.TypeString, + Optional: true, + ForceNew: true, + DiffSuppressFunc: compareSelfLinkOrResourceName, + }, + + "subnetwork": { + Type: schema.TypeString, + Optional: true, + ForceNew: true, + DiffSuppressFunc: compareSelfLinkOrResourceName, + }, }, } } @@ -121,10 +135,12 @@ func resourceDataflowJobCreate(d *schema.ResourceData, meta interface{}) error { params := expandStringMap(d, "parameters") env := dataflow.RuntimeEnvironment{ - TempLocation: d.Get("temp_gcs_location").(string), - Zone: zone, MaxWorkers: int64(d.Get("max_workers").(int)), + Network: d.Get("network").(string), ServiceAccountEmail: d.Get("service_account_email").(string), + Subnetwork: d.Get("subnetwork").(string), + TempLocation: d.Get("temp_gcs_location").(string), + Zone: zone, } request := dataflow.CreateJobFromTemplateRequest{ diff --git a/google/resource_dataflow_job_test.go b/google/resource_dataflow_job_test.go index bacdb701ce1..a98d62f2d43 100644 --- a/google/resource_dataflow_job_test.go +++ b/google/resource_dataflow_job_test.go @@ -68,6 +68,46 @@ func TestAccDataflowJobCreateWithServiceAccount(t *testing.T) { }) } +func TestAccDataflowJobCreateWithNetwork(t *testing.T) { + t.Parallel() + resource.Test(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckDataflowJobDestroy, + Steps: []resource.TestStep{ + { + Config: testAccDataflowJobWithNetwork, + Check: resource.ComposeTestCheckFunc( + testAccDataflowJobExists( + "google_dataflow_job.big_data"), + testAccDataflowJobHasNetwork( + "google_dataflow_job.big_data"), + ), + }, + }, + }) +} + +func TestAccDataflowJobCreateWithSubnetwork(t *testing.T) { + t.Parallel() + resource.Test(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckDataflowJobDestroy, + Steps: []resource.TestStep{ + { + Config: testAccDataflowJobWithSubnetwork, + Check: resource.ComposeTestCheckFunc( + testAccDataflowJobExists( + "google_dataflow_job.big_data"), + testAccDataflowJobHasSubnetwork( + "google_dataflow_job.big_data"), + ), + }, + }, + }) +} + func testAccCheckDataflowJobDestroy(s *terraform.State) error { for _, rs := range s.RootModule().Resources { if rs.Type != "google_dataflow_job" { @@ -128,6 +168,86 @@ func testAccDataflowJobExists(n string) resource.TestCheckFunc { } } +func testAccDataflowJobHasNetwork(n string) resource.TestCheckFunc { + return func(s *terraform.State) error { + rs, ok := s.RootModule().Resources[n] + if !ok { + return fmt.Errorf("Not found: %s", n) + } + if rs.Primary.ID == "" { + return fmt.Errorf("No ID is set") + } + + config := testAccProvider.Meta().(*Config) + + var network string + filter := fmt.Sprintf("properties.labels.dataflow_job_id = %s", rs.Primary.ID) + + retryErr := resource.Retry(1*time.Minute, func() *resource.RetryError { + instanceTemplates, err := config.clientCompute.InstanceTemplates.List(config.Project).Filter(filter).MaxResults(2).Fields("items/properties/networkInterfaces/network").Do() + if err != nil { + return resource.NonRetryableError(err) + } + if len(instanceTemplates.Items) == 0 { + return resource.RetryableError(fmt.Errorf("No instance templates for job %s", rs.Primary.ID)) + } + if len(instanceTemplates.Items) > 1 { + return resource.NonRetryableError(fmt.Errorf("Wrong number of matching instance templates for dataflow job: %s, %d", rs.Primary.ID, len(instanceTemplates.Items))) + } + network = instanceTemplates.Items[0].Properties.NetworkInterfaces[0].Network + return nil + }) + + if retryErr != nil { + return fmt.Errorf("Error getting network from instance template: %s", retryErr) + } + if GetResourceNameFromSelfLink(network) != GetResourceNameFromSelfLink(rs.Primary.Attributes["network"]) { + return fmt.Errorf("Network mismatch: %s != %s", network, rs.Primary.Attributes["network"]) + } + return nil + } +} + +func testAccDataflowJobHasSubnetwork(n string) resource.TestCheckFunc { + return func(s *terraform.State) error { + rs, ok := s.RootModule().Resources[n] + if !ok { + return fmt.Errorf("Not found: %s", n) + } + if rs.Primary.ID == "" { + return fmt.Errorf("No ID is set") + } + + config := testAccProvider.Meta().(*Config) + + var subnetwork string + filter := fmt.Sprintf("properties.labels.dataflow_job_id = %s", rs.Primary.ID) + + retryErr := resource.Retry(1*time.Minute, func() *resource.RetryError { + instanceTemplates, err := config.clientCompute.InstanceTemplates.List(config.Project).Filter(filter).MaxResults(2).Fields("items/properties/networkInterfaces/subnetwork").Do() + if err != nil { + return resource.NonRetryableError(err) + } + if len(instanceTemplates.Items) == 0 { + return resource.RetryableError(fmt.Errorf("No instance templates for job %s", rs.Primary.ID)) + } + if len(instanceTemplates.Items) > 1 { + return resource.NonRetryableError(fmt.Errorf("Wrong number of matching instance templates for dataflow job: %s, %d", rs.Primary.ID, len(instanceTemplates.Items))) + } + subnetwork = instanceTemplates.Items[0].Properties.NetworkInterfaces[0].Subnetwork + return nil + }) + + if retryErr != nil { + return fmt.Errorf("Error getting subnetwork from instance template: %s", retryErr) + } + if GetResourceNameFromSelfLink(subnetwork) != GetResourceNameFromSelfLink(rs.Primary.Attributes["subnetwork"]) { + return fmt.Errorf("Subnetwork mismatch: %s != %s", subnetwork, rs.Primary.Attributes["subnetwork"]) + } + return nil + } +} + func testAccDataflowJobHasServiceAccount(n string) resource.TestCheckFunc { return func(s *terraform.State) error { rs, ok := s.RootModule().Resources[n] @@ -247,6 +367,70 @@ resource "google_dataflow_job" "big_data" { on_delete = "cancel" }`, acctest.RandString(10), acctest.RandString(10), getTestProjectFromEnv()) +var testAccDataflowJobWithNetwork = fmt.Sprintf(` +resource "google_storage_bucket" "temp" { + name = "dfjob-test-%s-temp" + + force_destroy = true +} + +resource "google_compute_network" "net" { + name = "dfjob-test-%s-net" + auto_create_subnetworks = true +} + +resource "google_dataflow_job" "big_data" { + name = "dfjob-test-%s" + + template_gcs_path = "gs://dataflow-templates/wordcount/template_file" + temp_gcs_location = "${google_storage_bucket.temp.url}" + network = "${google_compute_network.net.name}" + + parameters = { + inputFile = "gs://dataflow-samples/shakespeare/kinglear.txt" + output = "${google_storage_bucket.temp.url}/output" + } + zone = "us-central1-f" + project = "%s" + + on_delete = "cancel" +}`, acctest.RandString(10), acctest.RandString(10), acctest.RandString(10), getTestProjectFromEnv()) + +var testAccDataflowJobWithSubnetwork = fmt.Sprintf(` +resource "google_storage_bucket" "temp" { + name = "dfjob-test-%s-temp" + + force_destroy = true +} + +resource "google_compute_network" "net" { + name = "dfjob-test-%s-net" + auto_create_subnetworks = false +} + +resource "google_compute_subnetwork" "subnet" { + name = "dfjob-test-%s-subnet" + ip_cidr_range = "10.2.0.0/16" + network = "${google_compute_network.net.self_link}" +} + +resource "google_dataflow_job" "big_data" { + name = "dfjob-test-%s" + + template_gcs_path = "gs://dataflow-templates/wordcount/template_file" + temp_gcs_location = "${google_storage_bucket.temp.url}" + subnetwork = "${google_compute_subnetwork.subnet.self_link}" + + parameters = { + inputFile = "gs://dataflow-samples/shakespeare/kinglear.txt" + output = "${google_storage_bucket.temp.url}/output" + } + zone = "us-central1-f" + project = "%s" + + on_delete = "cancel" +}`, acctest.RandString(10), acctest.RandString(10), acctest.RandString(10), acctest.RandString(10), getTestProjectFromEnv()) + var testAccDataflowJobWithServiceAccount = fmt.Sprintf(` resource "google_storage_bucket" "temp" { name = "dfjob-test-%s-temp" diff --git a/website/docs/r/dataflow_job.html.markdown b/website/docs/r/dataflow_job.html.markdown index e73d143ec57..2bdb15b4f9a 100644 --- a/website/docs/r/dataflow_job.html.markdown +++ b/website/docs/r/dataflow_job.html.markdown @@ -50,6 +50,9 @@ The following arguments are supported: * `project` - (Optional) The project in which the resource belongs. If it is not provided, the provider project is used. * `zone` - (Optional) The zone in which the created job should run. If it is not provided, the provider zone is used. * `service_account_email` - (Optional) The Service Account email used to create the job. +* `network` - (Optional) The network to which VMs will be assigned. If it is not provided, "default" will be used. +* `subnetwork` - (Optional) The subnetwork to which VMs will be assigned. Should be of the form "regions/REGION/subnetworks/SUBNETWORK". + ## Attributes Reference