Skip to content

Commit

Permalink
Add (sub)network to dataflow job
Browse files Browse the repository at this point in the history
Signed-off-by: Modular Magician <[email protected]>
  • Loading branch information
Ty Larrabee authored and modular-magician committed Apr 22, 2019
1 parent c522616 commit c0c1996
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 2 deletions.
18 changes: 16 additions & 2 deletions google-beta/resource_dataflow_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,18 @@ func resourceDataflowJob() *schema.Resource {
Optional: true,
ForceNew: true,
},

"network": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
},

"subnetwork": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
},
},
}
}
Expand All @@ -121,10 +133,12 @@ func resourceDataflowJobCreate(d *schema.ResourceData, meta interface{}) error {
params := expandStringMap(d, "parameters")

env := dataflow.RuntimeEnvironment{
TempLocation: d.Get("temp_gcs_location").(string),
Zone: zone,
MaxWorkers: int64(d.Get("max_workers").(int)),
Network: d.Get("network").(string),
ServiceAccountEmail: d.Get("service_account_email").(string),
Subnetwork: d.Get("subnetwork").(string),
TempLocation: d.Get("temp_gcs_location").(string),
Zone: zone,
}

request := dataflow.CreateJobFromTemplateRequest{
Expand Down
168 changes: 168 additions & 0 deletions google-beta/resource_dataflow_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,46 @@ func TestAccDataflowJobCreateWithServiceAccount(t *testing.T) {
})
}

func TestAccDataflowJobCreateWithNetwork(t *testing.T) {
t.Parallel()
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckDataflowJobDestroy,
Steps: []resource.TestStep{
{
Config: testAccDataflowJobWithNetwork,
Check: resource.ComposeTestCheckFunc(
testAccDataflowJobExists(
"google_dataflow_job.big_data"),
testAccDataflowJobHasNetwork(
"google_dataflow_job.big_data"),
),
},
},
})
}

func TestAccDataflowJobCreateWithSubnetwork(t *testing.T) {
t.Parallel()
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckDataflowJobDestroy,
Steps: []resource.TestStep{
{
Config: testAccDataflowJobWithSubnetwork,
Check: resource.ComposeTestCheckFunc(
testAccDataflowJobExists(
"google_dataflow_job.big_data"),
testAccDataflowJobHasSubnetwork(
"google_dataflow_job.big_data"),
),
},
},
})
}

func testAccCheckDataflowJobDestroy(s *terraform.State) error {
for _, rs := range s.RootModule().Resources {
if rs.Type != "google_dataflow_job" {
Expand Down Expand Up @@ -128,6 +168,70 @@ func testAccDataflowJobExists(n string) resource.TestCheckFunc {
}
}

func testAccDataflowJobHasNetwork(n string) resource.TestCheckFunc {
return func(s *terraform.State) error {
rs, ok := s.RootModule().Resources[n]
if !ok {
return fmt.Errorf("Not found: %s", n)
}
if rs.Primary.ID == "" {
return fmt.Errorf("No ID is set")
}

config := testAccProvider.Meta().(*Config)

job, err := config.clientDataflow.Projects.Jobs.Get(config.Project, rs.Primary.ID).Do()
if err != nil {
return fmt.Errorf("Job does not exist")
}
resource.Retry(1*time.Minute, func() *resource.RetryError {
pools := job.Environment.WorkerPools
if len(pools) < 1 {
return resource.RetryableError(fmt.Errorf("no worker pools for job"))
}
network := pools[0].Network
if network != rs.Primary.Attributes["network"] {
return resource.RetryableError(fmt.Errorf("Network mismatch: %s != %s", network, rs.Primary.Attributes["network"]))
}
return nil
})

return nil
}
}

func testAccDataflowJobHasSubnetwork(n string) resource.TestCheckFunc {
return func(s *terraform.State) error {
rs, ok := s.RootModule().Resources[n]
if !ok {
return fmt.Errorf("Not found: %s", n)
}
if rs.Primary.ID == "" {
return fmt.Errorf("No ID is set")
}

config := testAccProvider.Meta().(*Config)

job, err := config.clientDataflow.Projects.Jobs.Get(config.Project, rs.Primary.ID).Do()
if err != nil {
return fmt.Errorf("Job does not exist")
}
resource.Retry(1*time.Minute, func() *resource.RetryError {
pools := job.Environment.WorkerPools
if len(pools) < 1 {
return resource.RetryableError(fmt.Errorf("no worker pools for job"))
}
subnetwork := pools[0].Subnetwork
if subnetwork != rs.Primary.Attributes["subnetwork"] {
return resource.RetryableError(fmt.Errorf("Subnetwork mismatch: %s != %s", subnetwork, rs.Primary.Attributes["subnetwork"]))
}
return nil
})

return nil
}
}

func testAccDataflowJobHasServiceAccount(n string) resource.TestCheckFunc {
return func(s *terraform.State) error {
rs, ok := s.RootModule().Resources[n]
Expand Down Expand Up @@ -247,6 +351,70 @@ resource "google_dataflow_job" "big_data" {
on_delete = "cancel"
}`, acctest.RandString(10), acctest.RandString(10), getTestProjectFromEnv())

var testAccDataflowJobWithNetwork = fmt.Sprintf(`
resource "google_storage_bucket" "temp" {
name = "dfjob-test-%s-temp"
force_destroy = true
}
resource "google_compute_network" "net" {
name = "dfjob-test-%s-net"
auto_create_subnetworks = true
}
resource "google_dataflow_job" "big_data" {
name = "dfjob-test-%s"
template_gcs_path = "gs://dataflow-templates/wordcount/template_file"
temp_gcs_location = "${google_storage_bucket.temp.url}"
network = "${google_compute_network.net.name}"
parameters = {
inputFile = "gs://dataflow-samples/shakespeare/kinglear.txt"
output = "${google_storage_bucket.temp.url}/output"
}
zone = "us-central1-f"
project = "%s"
on_delete = "cancel"
}`, acctest.RandString(10), acctest.RandString(10), acctest.RandString(10), getTestProjectFromEnv())

var testAccDataflowJobWithSubnetwork = fmt.Sprintf(`
resource "google_storage_bucket" "temp" {
name = "dfjob-test-%s-temp"
force_destroy = true
}
resource "google_compute_network" "net" {
name = "dfjob-test-%s-net"
auto_create_subnetworks = false
}
resource "google_compute_subnetwork" "subnet" {
name = "dfjob-test-%s-subnet"
ip_cidr_range = "10.2.0.0/16"
network = "${google_compute_network.net.self_link}"
}
resource "google_dataflow_job" "big_data" {
name = "dfjob-test-%s"
template_gcs_path = "gs://dataflow-templates/wordcount/template_file"
temp_gcs_location = "${google_storage_bucket.temp.url}"
subnetwork = "${google_compute_subnetwork.subnet.self_link}"
parameters = {
inputFile = "gs://dataflow-samples/shakespeare/kinglear.txt"
output = "${google_storage_bucket.temp.url}/output"
}
zone = "us-central1-f"
project = "%s"
on_delete = "cancel"
}`, acctest.RandString(10), acctest.RandString(10), acctest.RandString(10), acctest.RandString(10), getTestProjectFromEnv())

var testAccDataflowJobWithServiceAccount = fmt.Sprintf(`
resource "google_storage_bucket" "temp" {
name = "dfjob-test-%s-temp"
Expand Down

0 comments on commit c0c1996

Please sign in to comment.