Skip to content

Commit

Permalink
Refactor runner configuration, add labels to dataflow options (#718)
Browse files Browse the repository at this point in the history
* Refactor runner configuration, add labels to dataflow options

* Remove util method

* Add javadocs for RunnerConfig class

* Apply spotless
  • Loading branch information
Chen Zhiling authored Jun 1, 2020
1 parent a5c6dce commit f9ce8ee
Show file tree
Hide file tree
Showing 12 changed files with 202 additions and 114 deletions.
2 changes: 1 addition & 1 deletion core/src/main/java/feast/core/config/FeastProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public static class Runner {
* Job runner configuration options. See the following for options
* https://api.docs.feast.dev/grpc/feast.core.pb.html#Runner
*/
Map<String, String> options = new HashMap<>();
Map<String, Object> options = new HashMap<>();

/**
* Gets the job runner type as an enum.
Expand Down
24 changes: 20 additions & 4 deletions core/src/main/java/feast/core/config/JobConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@
*/
package feast.core.config;

import com.google.gson.Gson;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import feast.core.config.FeastProperties.JobProperties;
import feast.core.job.JobManager;
import feast.core.job.dataflow.DataflowJobManager;
import feast.core.job.direct.DirectJobRegistry;
import feast.core.job.direct.DirectRunnerJobManager;
import feast.proto.core.RunnerProto.DataflowRunnerConfigOptions;
import feast.proto.core.RunnerProto.DirectRunnerConfigOptions;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -31,6 +36,7 @@
@Slf4j
@Configuration
public class JobConfig {
private final Gson gson = new Gson();

/**
* Get a JobManager according to the runner type and Dataflow configuration.
Expand All @@ -39,18 +45,28 @@ public class JobConfig {
*/
@Bean
@Autowired
public JobManager getJobManager(FeastProperties feastProperties) {
public JobManager getJobManager(FeastProperties feastProperties)
throws InvalidProtocolBufferException {

JobProperties jobProperties = feastProperties.getJobs();
FeastProperties.JobProperties.Runner runner = jobProperties.getActiveRunner();
Map<String, String> runnerConfigOptions = runner.getOptions();
Map<String, Object> runnerConfigOptions = runner.getOptions();
String configJson = gson.toJson(runnerConfigOptions);

FeastProperties.MetricsProperties metrics = jobProperties.getMetrics();

switch (runner.getType()) {
case DATAFLOW:
return new DataflowJobManager(runnerConfigOptions, metrics);
DataflowRunnerConfigOptions.Builder dataflowRunnerConfigOptions =
DataflowRunnerConfigOptions.newBuilder();
JsonFormat.parser().merge(configJson, dataflowRunnerConfigOptions);
return new DataflowJobManager(dataflowRunnerConfigOptions.build(), metrics);
case DIRECT:
return new DirectRunnerJobManager(runnerConfigOptions, new DirectJobRegistry(), metrics);
DirectRunnerConfigOptions.Builder directRunnerConfigOptions =
DirectRunnerConfigOptions.newBuilder();
JsonFormat.parser().merge(configJson, directRunnerConfigOptions);
return new DirectRunnerJobManager(
directRunnerConfigOptions.build(), new DirectJobRegistry(), metrics);
default:
throw new IllegalArgumentException("Unsupported runner: " + runner);
}
Expand Down
23 changes: 10 additions & 13 deletions core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,19 @@
import feast.core.job.Runner;
import feast.core.job.option.FeatureSetJsonByteConverter;
import feast.core.model.*;
import feast.core.util.TypeConversion;
import feast.ingestion.ImportJob;
import feast.ingestion.options.BZip2Compressor;
import feast.ingestion.options.ImportOptions;
import feast.ingestion.options.OptionCompressor;
import feast.proto.core.FeatureSetProto;
import feast.proto.core.RunnerProto.DataflowRunnerConfigOptions;
import feast.proto.core.SourceProto;
import feast.proto.core.StoreProto;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.runners.dataflow.DataflowPipelineJob;
import org.apache.beam.runners.dataflow.DataflowRunner;
Expand All @@ -61,21 +60,20 @@ public class DataflowJobManager implements JobManager {
private final String projectId;
private final String location;
private final Dataflow dataflow;
private final Map<String, String> defaultOptions;
private final DataflowRunnerConfig defaultOptions;
private final MetricsProperties metrics;

public DataflowJobManager(
Map<String, String> runnerConfigOptions, MetricsProperties metricsProperties) {
DataflowRunnerConfigOptions runnerConfigOptions, MetricsProperties metricsProperties) {
this(runnerConfigOptions, metricsProperties, getGoogleCredential());
}

public DataflowJobManager(
Map<String, String> runnerConfigOptions,
DataflowRunnerConfigOptions runnerConfigOptions,
MetricsProperties metricsProperties,
Credential credential) {

DataflowRunnerConfig config = new DataflowRunnerConfig(runnerConfigOptions);

defaultOptions = new DataflowRunnerConfig(runnerConfigOptions);
Dataflow dataflow = null;
try {
dataflow =
Expand All @@ -89,11 +87,10 @@ public DataflowJobManager(
throw new IllegalStateException("Unable to initialize DataflowJobManager", e);
}

this.defaultOptions = runnerConfigOptions;
this.dataflow = dataflow;
this.metrics = metricsProperties;
this.projectId = config.getProject();
this.location = config.getRegion();
this.projectId = defaultOptions.getProject();
this.location = defaultOptions.getRegion();
}

private static Credential getGoogleCredential() {
Expand Down Expand Up @@ -270,9 +267,9 @@ private ImportOptions getPipelineOptions(
List<FeatureSetProto.FeatureSet> featureSets,
StoreProto.Store sink,
boolean update)
throws IOException {
String[] args = TypeConversion.convertMapToArgs(defaultOptions);
ImportOptions pipelineOptions = PipelineOptionsFactory.fromArgs(args).as(ImportOptions.class);
throws IOException, IllegalAccessException {
ImportOptions pipelineOptions =
PipelineOptionsFactory.fromArgs(defaultOptions.toArgs()).as(ImportOptions.class);

OptionCompressor<List<FeatureSetProto.FeatureSet>> featureSetJsonCompressor =
new BZip2Compressor<>(new FeatureSetJsonByteConverter());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
*/
package feast.core.job.dataflow;

import java.lang.reflect.Field;
import java.util.Map;
import java.util.Set;
import feast.core.job.option.RunnerConfig;
import feast.proto.core.RunnerProto.DataflowRunnerConfigOptions;
import java.util.*;
import javax.validation.*;
import javax.validation.constraints.NotBlank;
import lombok.Getter;
Expand All @@ -27,77 +27,65 @@
/** DataflowRunnerConfig contains configuration fields for the Dataflow job runner. */
@Getter
@Setter
public class DataflowRunnerConfig {

public DataflowRunnerConfig(Map<String, String> runnerConfigOptions) {

// Try to find all fields in DataflowRunnerConfig inside the runnerConfigOptions and map it into
// this object
for (Field field : DataflowRunnerConfig.class.getFields()) {
String fieldName = field.getName();
try {
if (!runnerConfigOptions.containsKey(fieldName)) {
continue;
}
String value = runnerConfigOptions.get(fieldName);

if (Boolean.class.equals(field.getType())) {
field.set(this, Boolean.valueOf(value));
continue;
}
if (field.getType() == Integer.class) {
field.set(this, Integer.valueOf(value));
continue;
}
field.set(this, value);
} catch (IllegalAccessException e) {
throw new RuntimeException(
String.format(
"Could not successfully convert DataflowRunnerConfig for key: %s", fieldName),
e);
}
}
public class DataflowRunnerConfig extends RunnerConfig {

public DataflowRunnerConfig(DataflowRunnerConfigOptions runnerConfigOptions) {
this.project = runnerConfigOptions.getProject();
this.region = runnerConfigOptions.getRegion();
this.zone = runnerConfigOptions.getZone();
this.serviceAccount = runnerConfigOptions.getServiceAccount();
this.network = runnerConfigOptions.getNetwork();
this.subnetwork = runnerConfigOptions.getSubnetwork();
this.workerMachineType = runnerConfigOptions.getWorkerMachineType();
this.autoscalingAlgorithm = runnerConfigOptions.getAutoscalingAlgorithm();
this.usePublicIps = runnerConfigOptions.getUsePublicIps();
this.tempLocation = runnerConfigOptions.getTempLocation();
this.maxNumWorkers = runnerConfigOptions.getMaxNumWorkers();
this.deadLetterTableSpec = runnerConfigOptions.getDeadLetterTableSpec();
this.labels = runnerConfigOptions.getLabelsMap();
validate();
}

/* Project id to use when launching jobs. */
@NotBlank public String project;
@NotBlank String project;

/* The Google Compute Engine region for creating Dataflow jobs. */
@NotBlank public String region;
@NotBlank String region;

/* GCP availability zone for operations. */
@NotBlank public String zone;
@NotBlank String zone;

/* Run the job as a specific service account, instead of the default GCE robot. */
public String serviceAccount;
String serviceAccount;

/* GCE network for launching workers. */
@NotBlank public String network;
@NotBlank String network;

/* GCE subnetwork for launching workers. */
@NotBlank public String subnetwork;
@NotBlank String subnetwork;

/* Machine type to create Dataflow worker VMs as. */
public String workerMachineType;
String workerMachineType;

/* The autoscaling algorithm to use for the workerpool. */
public String autoscalingAlgorithm;
String autoscalingAlgorithm;

/* Specifies whether worker pools should be started with public IP addresses. */
public Boolean usePublicIps;
Boolean usePublicIps;

/**
* A pipeline level default location for storing temporary files. Support Google Cloud Storage
* locations, e.g. gs://bucket/object
*/
@NotBlank public String tempLocation;
@NotBlank String tempLocation;

/* The maximum number of workers to use for the workerpool. */
public Integer maxNumWorkers;
Integer maxNumWorkers;

/* BigQuery table specification, e.g. PROJECT_ID:DATASET_ID.PROJECT_ID */
public String deadLetterTableSpec;
String deadLetterTableSpec;

Map<String, String> labels;

/** Validates Dataflow runner configuration options */
public void validate() {
Expand Down
36 changes: 36 additions & 0 deletions core/src/main/java/feast/core/job/direct/DirectRunnerConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.core.job.direct;

import feast.core.job.option.RunnerConfig;
import feast.proto.core.RunnerProto.DirectRunnerConfigOptions;

public class DirectRunnerConfig extends RunnerConfig {
/**
* Controls the amount of target parallelism the DirectRunner will use. Defaults to the greater of
* the number of available processors and 3. Must be a value greater than zero.
*/
Integer targetParallelism;

/* BigQuery table specification, e.g. PROJECT_ID:DATASET_ID.PROJECT_ID */
String deadletterTableSpec;

public DirectRunnerConfig(DirectRunnerConfigOptions runnerConfigOptions) {
this.deadletterTableSpec = runnerConfigOptions.getDeadLetterTableSpec();
this.targetParallelism = runnerConfigOptions.getTargetParallelism();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,17 @@
import feast.core.model.Job;
import feast.core.model.JobStatus;
import feast.core.model.Project;
import feast.core.util.TypeConversion;
import feast.ingestion.ImportJob;
import feast.ingestion.options.BZip2Compressor;
import feast.ingestion.options.ImportOptions;
import feast.ingestion.options.OptionCompressor;
import feast.proto.core.FeatureSetProto;
import feast.proto.core.RunnerProto.DirectRunnerConfigOptions;
import feast.proto.core.StoreProto;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.PipelineResult;
Expand All @@ -49,15 +48,15 @@ public class DirectRunnerJobManager implements JobManager {

private final Runner RUNNER_TYPE = Runner.DIRECT;

protected Map<String, String> defaultOptions;
private DirectRunnerConfig defaultOptions;
private final DirectJobRegistry jobs;
private MetricsProperties metrics;

public DirectRunnerJobManager(
Map<String, String> defaultOptions,
DirectRunnerConfigOptions directRunnerConfigOptions,
DirectJobRegistry jobs,
MetricsProperties metricsProperties) {
this.defaultOptions = defaultOptions;
this.defaultOptions = new DirectRunnerConfig(directRunnerConfigOptions);
this.jobs = jobs;
this.metrics = metricsProperties;
}
Expand Down Expand Up @@ -95,9 +94,9 @@ public Job startJob(Job job) {

private ImportOptions getPipelineOptions(
String jobName, List<FeatureSetProto.FeatureSet> featureSets, StoreProto.Store sink)
throws IOException {
String[] args = TypeConversion.convertMapToArgs(defaultOptions);
ImportOptions pipelineOptions = PipelineOptionsFactory.fromArgs(args).as(ImportOptions.class);
throws IOException, IllegalAccessException {
ImportOptions pipelineOptions =
PipelineOptionsFactory.fromArgs(defaultOptions.toArgs()).as(ImportOptions.class);

OptionCompressor<List<FeatureSetProto.FeatureSet>> featureSetJsonCompressor =
new BZip2Compressor<>(new FeatureSetJsonByteConverter());
Expand Down
Loading

0 comments on commit f9ce8ee

Please sign in to comment.