Skip to content

Commit

Permalink
Add disk size as Dataflow Job Configuration (#841)
Browse files Browse the repository at this point in the history
Co-authored-by: Khor Shu Heng <[email protected]>
  • Loading branch information
khorshuheng and khorshuheng authored Jun 30, 2020
1 parent 7980e33 commit cf5ed2d
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public DataflowRunnerConfig(DataflowRunnerConfigOptions runnerConfigOptions) {
this.tempLocation = runnerConfigOptions.getTempLocation();
this.maxNumWorkers = runnerConfigOptions.getMaxNumWorkers();
this.deadLetterTableSpec = runnerConfigOptions.getDeadLetterTableSpec();
this.diskSizeGb = runnerConfigOptions.getDiskSizeGb();
this.labels = runnerConfigOptions.getLabelsMap();
validate();
}
Expand Down Expand Up @@ -85,6 +86,9 @@ public DataflowRunnerConfig(DataflowRunnerConfigOptions runnerConfigOptions) {
/* BigQuery table specification, e.g. PROJECT_ID:DATASET_ID.PROJECT_ID */
public String deadLetterTableSpec;

/* Disk size to use on each remote Compute Engine worker instance */
public Integer diskSizeGb;

public Map<String, String> labels;

/** Validates Dataflow runner configuration options */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public void shouldConvertToPipelineArgs() throws IllegalAccessException {
.setUsePublicIps(false)
.setWorkerMachineType("n1-standard-1")
.setDeadLetterTableSpec("project_id:dataset_id.table_id")
.setDiskSizeGb(100)
.putLabels("key", "value")
.build();

Expand All @@ -60,9 +61,46 @@ public void shouldConvertToPipelineArgs() throws IllegalAccessException {
"--usePublicIps=false",
"--workerMachineType=n1-standard-1",
"--deadLetterTableSpec=project_id:dataset_id.table_id",
"--diskSizeGb=100",
"--labels={\"key\":\"value\"}")
.toArray(String[]::new);
assertThat(args.size(), equalTo(expectedArgs.length));
assertThat(args, containsInAnyOrder(expectedArgs));
}

@Test
public void shouldIgnoreOptionalArguments() throws IllegalAccessException {
DataflowRunnerConfigOptions opts =
DataflowRunnerConfigOptions.newBuilder()
.setProject("my-project")
.setRegion("asia-east1")
.setZone("asia-east1-a")
.setTempLocation("gs://bucket/tempLocation")
.setNetwork("default")
.setSubnetwork("regions/asia-east1/subnetworks/mysubnetwork")
.setMaxNumWorkers(1)
.setAutoscalingAlgorithm("THROUGHPUT_BASED")
.setUsePublicIps(false)
.setWorkerMachineType("n1-standard-1")
.build();

DataflowRunnerConfig dataflowRunnerConfig = new DataflowRunnerConfig(opts);
List<String> args = Lists.newArrayList(dataflowRunnerConfig.toArgs());
String[] expectedArgs =
Arrays.asList(
"--project=my-project",
"--region=asia-east1",
"--zone=asia-east1-a",
"--tempLocation=gs://bucket/tempLocation",
"--network=default",
"--subnetwork=regions/asia-east1/subnetworks/mysubnetwork",
"--maxNumWorkers=1",
"--autoscalingAlgorithm=THROUGHPUT_BASED",
"--usePublicIps=false",
"--workerMachineType=n1-standard-1",
"--labels={}")
.toArray(String[]::new);
assertThat(args.size(), equalTo(expectedArgs.length));
assertThat(args, containsInAnyOrder(expectedArgs));
}
}
4 changes: 4 additions & 0 deletions protos/feast/core/Runner.proto
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,8 @@ message DataflowRunnerConfigOptions {

/* Labels to apply to the dataflow job */
map<string, string> labels = 13;

/* Disk size to use on each remote Compute Engine worker instance */
int32 diskSizeGb = 14;

}

0 comments on commit cf5ed2d

Please sign in to comment.