From f9ce8ee7291a1b9c0ead061069715db26bd604ab Mon Sep 17 00:00:00 2001 From: Chen Zhiling Date: Mon, 1 Jun 2020 10:32:59 +0800 Subject: [PATCH] Refactor runner configuration, add labels to dataflow options (#718) * Refactor runner configuration, add labels to dataflow options * Remove util method * Add javadocs for RunnerConfig class * Apply spotless --- .../feast/core/config/FeastProperties.java | 2 +- .../java/feast/core/config/JobConfig.java | 24 +++++- .../core/job/dataflow/DataflowJobManager.java | 23 +++--- .../job/dataflow/DataflowRunnerConfig.java | 78 ++++++++----------- .../core/job/direct/DirectRunnerConfig.java | 36 +++++++++ .../job/direct/DirectRunnerJobManager.java | 15 ++-- .../feast/core/job/option/RunnerConfig.java | 75 ++++++++++++++++++ .../java/feast/core/util/TypeConversion.java | 16 ---- .../job/dataflow/DataflowJobManagerTest.java | 23 +++--- .../direct/DirectRunnerJobManagerTest.java | 9 +-- .../feast/core/util/TypeConversionTest.java | 12 --- protos/feast/core/Runner.proto | 3 + 12 files changed, 202 insertions(+), 114 deletions(-) create mode 100644 core/src/main/java/feast/core/job/direct/DirectRunnerConfig.java create mode 100644 core/src/main/java/feast/core/job/option/RunnerConfig.java diff --git a/core/src/main/java/feast/core/config/FeastProperties.java b/core/src/main/java/feast/core/config/FeastProperties.java index eb50728baf..6dad278242 100644 --- a/core/src/main/java/feast/core/config/FeastProperties.java +++ b/core/src/main/java/feast/core/config/FeastProperties.java @@ -103,7 +103,7 @@ public static class Runner { * Job runner configuration options. See the following for options * https://api.docs.feast.dev/grpc/feast.core.pb.html#Runner */ - Map options = new HashMap<>(); + Map options = new HashMap<>(); /** * Gets the job runner type as an enum. diff --git a/core/src/main/java/feast/core/config/JobConfig.java b/core/src/main/java/feast/core/config/JobConfig.java index 69636963be..30023de064 100644 --- a/core/src/main/java/feast/core/config/JobConfig.java +++ b/core/src/main/java/feast/core/config/JobConfig.java @@ -16,11 +16,16 @@ */ package feast.core.config; +import com.google.gson.Gson; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.util.JsonFormat; import feast.core.config.FeastProperties.JobProperties; import feast.core.job.JobManager; import feast.core.job.dataflow.DataflowJobManager; import feast.core.job.direct.DirectJobRegistry; import feast.core.job.direct.DirectRunnerJobManager; +import feast.proto.core.RunnerProto.DataflowRunnerConfigOptions; +import feast.proto.core.RunnerProto.DirectRunnerConfigOptions; import java.util.Map; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -31,6 +36,7 @@ @Slf4j @Configuration public class JobConfig { + private final Gson gson = new Gson(); /** * Get a JobManager according to the runner type and Dataflow configuration. @@ -39,18 +45,28 @@ public class JobConfig { */ @Bean @Autowired - public JobManager getJobManager(FeastProperties feastProperties) { + public JobManager getJobManager(FeastProperties feastProperties) + throws InvalidProtocolBufferException { JobProperties jobProperties = feastProperties.getJobs(); FeastProperties.JobProperties.Runner runner = jobProperties.getActiveRunner(); - Map runnerConfigOptions = runner.getOptions(); + Map runnerConfigOptions = runner.getOptions(); + String configJson = gson.toJson(runnerConfigOptions); + FeastProperties.MetricsProperties metrics = jobProperties.getMetrics(); switch (runner.getType()) { case DATAFLOW: - return new DataflowJobManager(runnerConfigOptions, metrics); + DataflowRunnerConfigOptions.Builder dataflowRunnerConfigOptions = + DataflowRunnerConfigOptions.newBuilder(); + JsonFormat.parser().merge(configJson, dataflowRunnerConfigOptions); + return new DataflowJobManager(dataflowRunnerConfigOptions.build(), metrics); case DIRECT: - return new DirectRunnerJobManager(runnerConfigOptions, new DirectJobRegistry(), metrics); + DirectRunnerConfigOptions.Builder directRunnerConfigOptions = + DirectRunnerConfigOptions.newBuilder(); + JsonFormat.parser().merge(configJson, directRunnerConfigOptions); + return new DirectRunnerJobManager( + directRunnerConfigOptions.build(), new DirectJobRegistry(), metrics); default: throw new IllegalArgumentException("Unsupported runner: " + runner); } diff --git a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java index 7b9df0abd5..2c3da255f5 100644 --- a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java +++ b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java @@ -33,12 +33,12 @@ import feast.core.job.Runner; import feast.core.job.option.FeatureSetJsonByteConverter; import feast.core.model.*; -import feast.core.util.TypeConversion; import feast.ingestion.ImportJob; import feast.ingestion.options.BZip2Compressor; import feast.ingestion.options.ImportOptions; import feast.ingestion.options.OptionCompressor; import feast.proto.core.FeatureSetProto; +import feast.proto.core.RunnerProto.DataflowRunnerConfigOptions; import feast.proto.core.SourceProto; import feast.proto.core.StoreProto; import java.io.IOException; @@ -46,7 +46,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Map; import lombok.extern.slf4j.Slf4j; import org.apache.beam.runners.dataflow.DataflowPipelineJob; import org.apache.beam.runners.dataflow.DataflowRunner; @@ -61,21 +60,20 @@ public class DataflowJobManager implements JobManager { private final String projectId; private final String location; private final Dataflow dataflow; - private final Map defaultOptions; + private final DataflowRunnerConfig defaultOptions; private final MetricsProperties metrics; public DataflowJobManager( - Map runnerConfigOptions, MetricsProperties metricsProperties) { + DataflowRunnerConfigOptions runnerConfigOptions, MetricsProperties metricsProperties) { this(runnerConfigOptions, metricsProperties, getGoogleCredential()); } public DataflowJobManager( - Map runnerConfigOptions, + DataflowRunnerConfigOptions runnerConfigOptions, MetricsProperties metricsProperties, Credential credential) { - DataflowRunnerConfig config = new DataflowRunnerConfig(runnerConfigOptions); - + defaultOptions = new DataflowRunnerConfig(runnerConfigOptions); Dataflow dataflow = null; try { dataflow = @@ -89,11 +87,10 @@ public DataflowJobManager( throw new IllegalStateException("Unable to initialize DataflowJobManager", e); } - this.defaultOptions = runnerConfigOptions; this.dataflow = dataflow; this.metrics = metricsProperties; - this.projectId = config.getProject(); - this.location = config.getRegion(); + this.projectId = defaultOptions.getProject(); + this.location = defaultOptions.getRegion(); } private static Credential getGoogleCredential() { @@ -270,9 +267,9 @@ private ImportOptions getPipelineOptions( List featureSets, StoreProto.Store sink, boolean update) - throws IOException { - String[] args = TypeConversion.convertMapToArgs(defaultOptions); - ImportOptions pipelineOptions = PipelineOptionsFactory.fromArgs(args).as(ImportOptions.class); + throws IOException, IllegalAccessException { + ImportOptions pipelineOptions = + PipelineOptionsFactory.fromArgs(defaultOptions.toArgs()).as(ImportOptions.class); OptionCompressor> featureSetJsonCompressor = new BZip2Compressor<>(new FeatureSetJsonByteConverter()); diff --git a/core/src/main/java/feast/core/job/dataflow/DataflowRunnerConfig.java b/core/src/main/java/feast/core/job/dataflow/DataflowRunnerConfig.java index 6fe93ca80c..85628d2cd0 100644 --- a/core/src/main/java/feast/core/job/dataflow/DataflowRunnerConfig.java +++ b/core/src/main/java/feast/core/job/dataflow/DataflowRunnerConfig.java @@ -16,9 +16,9 @@ */ package feast.core.job.dataflow; -import java.lang.reflect.Field; -import java.util.Map; -import java.util.Set; +import feast.core.job.option.RunnerConfig; +import feast.proto.core.RunnerProto.DataflowRunnerConfigOptions; +import java.util.*; import javax.validation.*; import javax.validation.constraints.NotBlank; import lombok.Getter; @@ -27,77 +27,65 @@ /** DataflowRunnerConfig contains configuration fields for the Dataflow job runner. */ @Getter @Setter -public class DataflowRunnerConfig { - - public DataflowRunnerConfig(Map runnerConfigOptions) { - - // Try to find all fields in DataflowRunnerConfig inside the runnerConfigOptions and map it into - // this object - for (Field field : DataflowRunnerConfig.class.getFields()) { - String fieldName = field.getName(); - try { - if (!runnerConfigOptions.containsKey(fieldName)) { - continue; - } - String value = runnerConfigOptions.get(fieldName); - - if (Boolean.class.equals(field.getType())) { - field.set(this, Boolean.valueOf(value)); - continue; - } - if (field.getType() == Integer.class) { - field.set(this, Integer.valueOf(value)); - continue; - } - field.set(this, value); - } catch (IllegalAccessException e) { - throw new RuntimeException( - String.format( - "Could not successfully convert DataflowRunnerConfig for key: %s", fieldName), - e); - } - } +public class DataflowRunnerConfig extends RunnerConfig { + + public DataflowRunnerConfig(DataflowRunnerConfigOptions runnerConfigOptions) { + this.project = runnerConfigOptions.getProject(); + this.region = runnerConfigOptions.getRegion(); + this.zone = runnerConfigOptions.getZone(); + this.serviceAccount = runnerConfigOptions.getServiceAccount(); + this.network = runnerConfigOptions.getNetwork(); + this.subnetwork = runnerConfigOptions.getSubnetwork(); + this.workerMachineType = runnerConfigOptions.getWorkerMachineType(); + this.autoscalingAlgorithm = runnerConfigOptions.getAutoscalingAlgorithm(); + this.usePublicIps = runnerConfigOptions.getUsePublicIps(); + this.tempLocation = runnerConfigOptions.getTempLocation(); + this.maxNumWorkers = runnerConfigOptions.getMaxNumWorkers(); + this.deadLetterTableSpec = runnerConfigOptions.getDeadLetterTableSpec(); + this.labels = runnerConfigOptions.getLabelsMap(); validate(); } /* Project id to use when launching jobs. */ - @NotBlank public String project; + @NotBlank String project; /* The Google Compute Engine region for creating Dataflow jobs. */ - @NotBlank public String region; + @NotBlank String region; /* GCP availability zone for operations. */ - @NotBlank public String zone; + @NotBlank String zone; /* Run the job as a specific service account, instead of the default GCE robot. */ - public String serviceAccount; + String serviceAccount; /* GCE network for launching workers. */ - @NotBlank public String network; + @NotBlank String network; /* GCE subnetwork for launching workers. */ - @NotBlank public String subnetwork; + @NotBlank String subnetwork; /* Machine type to create Dataflow worker VMs as. */ - public String workerMachineType; + String workerMachineType; /* The autoscaling algorithm to use for the workerpool. */ - public String autoscalingAlgorithm; + String autoscalingAlgorithm; /* Specifies whether worker pools should be started with public IP addresses. */ - public Boolean usePublicIps; + Boolean usePublicIps; /** * A pipeline level default location for storing temporary files. Support Google Cloud Storage * locations, e.g. gs://bucket/object */ - @NotBlank public String tempLocation; + @NotBlank String tempLocation; /* The maximum number of workers to use for the workerpool. */ - public Integer maxNumWorkers; + Integer maxNumWorkers; /* BigQuery table specification, e.g. PROJECT_ID:DATASET_ID.PROJECT_ID */ - public String deadLetterTableSpec; + String deadLetterTableSpec; + + Map labels; /** Validates Dataflow runner configuration options */ public void validate() { diff --git a/core/src/main/java/feast/core/job/direct/DirectRunnerConfig.java b/core/src/main/java/feast/core/job/direct/DirectRunnerConfig.java new file mode 100644 index 0000000000..ebd327f2f7 --- /dev/null +++ b/core/src/main/java/feast/core/job/direct/DirectRunnerConfig.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.job.direct; + +import feast.core.job.option.RunnerConfig; +import feast.proto.core.RunnerProto.DirectRunnerConfigOptions; + +public class DirectRunnerConfig extends RunnerConfig { + /** + * Controls the amount of target parallelism the DirectRunner will use. Defaults to the greater of + * the number of available processors and 3. Must be a value greater than zero. + */ + Integer targetParallelism; + + /* BigQuery table specification, e.g. PROJECT_ID:DATASET_ID.PROJECT_ID */ + String deadletterTableSpec; + + public DirectRunnerConfig(DirectRunnerConfigOptions runnerConfigOptions) { + this.deadletterTableSpec = runnerConfigOptions.getDeadLetterTableSpec(); + this.targetParallelism = runnerConfigOptions.getTargetParallelism(); + } +} diff --git a/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java b/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java index 2e2b43047e..715adbdd43 100644 --- a/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java +++ b/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java @@ -27,18 +27,17 @@ import feast.core.model.Job; import feast.core.model.JobStatus; import feast.core.model.Project; -import feast.core.util.TypeConversion; import feast.ingestion.ImportJob; import feast.ingestion.options.BZip2Compressor; import feast.ingestion.options.ImportOptions; import feast.ingestion.options.OptionCompressor; import feast.proto.core.FeatureSetProto; +import feast.proto.core.RunnerProto.DirectRunnerConfigOptions; import feast.proto.core.StoreProto; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Map; import lombok.extern.slf4j.Slf4j; import org.apache.beam.runners.direct.DirectRunner; import org.apache.beam.sdk.PipelineResult; @@ -49,15 +48,15 @@ public class DirectRunnerJobManager implements JobManager { private final Runner RUNNER_TYPE = Runner.DIRECT; - protected Map defaultOptions; + private DirectRunnerConfig defaultOptions; private final DirectJobRegistry jobs; private MetricsProperties metrics; public DirectRunnerJobManager( - Map defaultOptions, + DirectRunnerConfigOptions directRunnerConfigOptions, DirectJobRegistry jobs, MetricsProperties metricsProperties) { - this.defaultOptions = defaultOptions; + this.defaultOptions = new DirectRunnerConfig(directRunnerConfigOptions); this.jobs = jobs; this.metrics = metricsProperties; } @@ -95,9 +94,9 @@ public Job startJob(Job job) { private ImportOptions getPipelineOptions( String jobName, List featureSets, StoreProto.Store sink) - throws IOException { - String[] args = TypeConversion.convertMapToArgs(defaultOptions); - ImportOptions pipelineOptions = PipelineOptionsFactory.fromArgs(args).as(ImportOptions.class); + throws IOException, IllegalAccessException { + ImportOptions pipelineOptions = + PipelineOptionsFactory.fromArgs(defaultOptions.toArgs()).as(ImportOptions.class); OptionCompressor> featureSetJsonCompressor = new BZip2Compressor<>(new FeatureSetJsonByteConverter()); diff --git a/core/src/main/java/feast/core/job/option/RunnerConfig.java b/core/src/main/java/feast/core/job/option/RunnerConfig.java new file mode 100644 index 0000000000..4b937074a3 --- /dev/null +++ b/core/src/main/java/feast/core/job/option/RunnerConfig.java @@ -0,0 +1,75 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.job.option; + +import feast.core.util.TypeConversion; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Value class containing the application-default configuration for a runner. When a job is started + * by core, all fields in the object will be converted into --key=value args to seed the beam + * pipeline options. + */ +public abstract class RunnerConfig { + + /** + * Converts the fields in this class to a list of --key=value args to be passed to a {@link + * org.apache.beam.sdk.options.PipelineOptionsFactory}. + * + *

Ignores values that are proto-default (e.g. empty string, 0). + * + * @return Array of string args in the format --key=value. + * @throws IllegalAccessException + */ + public String[] toArgs() throws IllegalAccessException { + List args = new ArrayList<>(); + for (Field field : this.getClass().getFields()) { + if (field.get(this) == null) { + continue; + } + Class type = field.getType(); + if (Map.class.equals(type)) { + String jsonString = + TypeConversion.convertMapToJsonString((Map) field.get(this)); + args.add(String.format("--%s=%s", field.getName(), jsonString)); + continue; + } + + if (String.class.equals(type)) { + String val = (String) field.get(this); + if (!val.equals("")) { + args.add(String.format("--%s=%s", field.getName(), val)); + } + continue; + } + + if (Integer.class.equals(type)) { + Integer val = (Integer) field.get(this); + if (val != 0) { + args.add(String.format("--%s=%d", field.getName(), val)); + } + continue; + } + + args.add(String.format("--%s=%s", field.getName(), field.get(this))); + } + return args.toArray(String[]::new); + } +} diff --git a/core/src/main/java/feast/core/util/TypeConversion.java b/core/src/main/java/feast/core/util/TypeConversion.java index 6ee990fc1c..e6b7ef33cb 100644 --- a/core/src/main/java/feast/core/util/TypeConversion.java +++ b/core/src/main/java/feast/core/util/TypeConversion.java @@ -16,12 +16,10 @@ */ package feast.core.util; -import com.google.common.base.Strings; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import java.lang.reflect.Type; import java.util.*; -import java.util.Map.Entry; public class TypeConversion { private static Gson gson = new Gson(); @@ -72,18 +70,4 @@ public static Map convertJsonStringToMap(String jsonString) { public static String convertMapToJsonString(Map map) { return gson.toJson(map); } - - /** - * Convert a map of key value pairs to a array of java arguments in format --key=value - * - * @param map - * @return array of string arguments - */ - public static String[] convertMapToArgs(Map map) { - List args = new ArrayList<>(); - for (Entry arg : map.entrySet()) { - args.add(Strings.lenientFormat("--%s=%s", arg.getKey(), arg.getValue())); - } - return args.toArray(new String[] {}); - } } diff --git a/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java b/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java index 632c9d22a2..ea9caa91ff 100644 --- a/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java +++ b/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java @@ -40,6 +40,8 @@ import feast.proto.core.FeatureSetProto; import feast.proto.core.FeatureSetProto.FeatureSetMeta; import feast.proto.core.FeatureSetProto.FeatureSetSpec; +import feast.proto.core.RunnerProto.DataflowRunnerConfigOptions; +import feast.proto.core.RunnerProto.DataflowRunnerConfigOptions.Builder; import feast.proto.core.SourceProto; import feast.proto.core.SourceProto.KafkaSourceConfig; import feast.proto.core.SourceProto.SourceType; @@ -49,9 +51,7 @@ import feast.proto.core.StoreProto.Store.Subscription; import java.io.IOException; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import org.apache.beam.runners.dataflow.DataflowPipelineJob; import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.sdk.PipelineResult.State; @@ -70,19 +70,21 @@ public class DataflowJobManagerTest { @Mock private Dataflow dataflow; - private Map defaults; + private DataflowRunnerConfigOptions defaults; private DataflowJobManager dfJobManager; @Before public void setUp() { initMocks(this); - defaults = new HashMap<>(); - defaults.put("project", "project"); - defaults.put("region", "region"); - defaults.put("zone", "zone"); - defaults.put("tempLocation", "tempLocation"); - defaults.put("network", "network"); - defaults.put("subnetwork", "subnetwork"); + Builder optionsBuilder = DataflowRunnerConfigOptions.newBuilder(); + optionsBuilder.setProject("project"); + optionsBuilder.setRegion("region"); + optionsBuilder.setZone("zone"); + optionsBuilder.setTempLocation("tempLocation"); + optionsBuilder.setNetwork("network"); + optionsBuilder.setSubnetwork("subnetwork"); + optionsBuilder.putLabels("orchestrator", "feast"); + defaults = optionsBuilder.build(); MetricsProperties metricsProperties = new MetricsProperties(); metricsProperties.setEnabled(false); Credential credential = null; @@ -137,6 +139,7 @@ public void shouldStartJobWithCorrectPipelineOptions() throws IOException { expectedPipelineOptions.setRegion("region"); expectedPipelineOptions.setUpdate(false); expectedPipelineOptions.setAppName("DataflowJobManager"); + expectedPipelineOptions.setLabels(defaults.getLabelsMap()); expectedPipelineOptions.setJobName(jobName); expectedPipelineOptions.setStoreJson(Lists.newArrayList(printer.print(store))); diff --git a/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java b/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java index 42d3189a73..0128f5aa0b 100644 --- a/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java +++ b/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java @@ -42,6 +42,7 @@ import feast.ingestion.options.OptionCompressor; import feast.proto.core.FeatureSetProto; import feast.proto.core.FeatureSetProto.FeatureSetSpec; +import feast.proto.core.RunnerProto.DirectRunnerConfigOptions; import feast.proto.core.SourceProto; import feast.proto.core.SourceProto.KafkaSourceConfig; import feast.proto.core.SourceProto.SourceType; @@ -51,9 +52,7 @@ import feast.proto.core.StoreProto.Store.Subscription; import java.io.IOException; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import org.apache.beam.runners.direct.DirectRunner; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -71,12 +70,12 @@ public class DirectRunnerJobManagerTest { @Mock private DirectJobRegistry directJobRegistry; private DirectRunnerJobManager drJobManager; - private Map defaults; + private DirectRunnerConfigOptions defaults; @Before public void setUp() { initMocks(this); - defaults = new HashMap<>(); + defaults = DirectRunnerConfigOptions.newBuilder().setTargetParallelism(1).build(); MetricsProperties metricsProperties = new MetricsProperties(); metricsProperties.setEnabled(false); @@ -123,7 +122,7 @@ public void shouldStartDirectJobAndRegisterPipelineResult() throws IOException { expectedPipelineOptions.setAppName("DirectRunnerJobManager"); expectedPipelineOptions.setRunner(DirectRunner.class); expectedPipelineOptions.setBlockOnRun(false); - expectedPipelineOptions.setProject(""); + expectedPipelineOptions.setTargetParallelism(1); expectedPipelineOptions.setStoreJson(Lists.newArrayList(printer.print(store))); expectedPipelineOptions.setProject(""); diff --git a/core/src/test/java/feast/core/util/TypeConversionTest.java b/core/src/test/java/feast/core/util/TypeConversionTest.java index 02f0a7cee4..c44bf50129 100644 --- a/core/src/test/java/feast/core/util/TypeConversionTest.java +++ b/core/src/test/java/feast/core/util/TypeConversionTest.java @@ -74,16 +74,4 @@ public void convertMapToJsonStringShouldReturnEmptyJsonForAnEmptyMap() { Map input = new HashMap<>(); assertThat(TypeConversion.convertMapToJsonString(input), equalTo("{}")); } - - @Test - public void convertJsonStringToArgsShouldReturnCorrectListOfArgs() { - Map input = new HashMap<>(); - input.put("key", "value"); - input.put("key2", "value2"); - - String[] expected = new String[] {"--key=value", "--key2=value2"}; - String[] actual = TypeConversion.convertMapToArgs(input); - assertThat(actual.length, equalTo(expected.length)); - assertTrue(Arrays.asList(actual).containsAll(Arrays.asList(expected))); - } } diff --git a/protos/feast/core/Runner.proto b/protos/feast/core/Runner.proto index 544972286d..91c1e99485 100644 --- a/protos/feast/core/Runner.proto +++ b/protos/feast/core/Runner.proto @@ -70,4 +70,7 @@ message DataflowRunnerConfigOptions { /* BigQuery table specification, e.g. PROJECT_ID:DATASET_ID.PROJECT_ID */ string deadLetterTableSpec = 12; + + /* Labels to apply to the dataflow job */ + map labels = 13; } \ No newline at end of file