Skip to content

Commit

Permalink
Apply default project to rows without project during ingestion (#701)
Browse files Browse the repository at this point in the history
* Apply default project to incoming rows without project defined

* Apply spotless

* Fix broken test
  • Loading branch information
Chen Zhiling authored May 19, 2020
1 parent c1a5c43 commit d8459e0
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ private ImportOptions getPipelineOptions(
pipelineOptions.setFeatureSetJson(featureSetJsonCompressor.compress(featureSets));
pipelineOptions.setStoreJson(Collections.singletonList(JsonFormat.printer().print(sink)));
pipelineOptions.setProject(projectId);
pipelineOptions.setDefaultFeastProject(Project.DEFAULT_NAME);
pipelineOptions.setUpdate(update);
pipelineOptions.setRunner(DataflowRunner.class);
pipelineOptions.setJobName(jobName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import feast.core.model.FeatureSet;
import feast.core.model.Job;
import feast.core.model.JobStatus;
import feast.core.model.Project;
import feast.core.util.TypeConversion;
import feast.ingestion.ImportJob;
import feast.ingestion.options.BZip2Compressor;
Expand Down Expand Up @@ -105,6 +106,7 @@ private ImportOptions getPipelineOptions(
pipelineOptions.setJobName(jobName);
pipelineOptions.setStoreJson(Collections.singletonList(JsonFormat.printer().print(sink)));
pipelineOptions.setRunner(DirectRunner.class);
pipelineOptions.setDefaultFeastProject(Project.DEFAULT_NAME);
pipelineOptions.setProject(""); // set to default value to satisfy validation
if (metrics.isEnabled()) {
pipelineOptions.setMetricsExporterType(metrics.getType());
Expand Down
1 change: 1 addition & 0 deletions core/src/main/java/feast/core/model/Project.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
@Entity
@Table(name = "projects")
public class Project {
public static final String DEFAULT_NAME = "default";

// Name of the project
@Id
Expand Down
1 change: 1 addition & 0 deletions ingestion/src/main/java/feast/ingestion/ImportJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ public static PipelineResult runPipeline(ImportOptions options) throws IOExcepti
.get(FEATURE_ROW_OUT)
.apply(
ProcessAndValidateFeatureRows.newBuilder()
.setDefaultProject(options.getDefaultFeastProject())
.setFeatureSetSpecs(featureSetSpecsByKey)
.setSuccessTag(FEATURE_ROW_OUT)
.setFailureTag(DEADLETTER_OUT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@
/** Options passed to Beam to influence the job's execution environment */
public interface ImportOptions extends PipelineOptions, DataflowPipelineOptions, DirectOptions {

@Required
@Description(
"Default feast project to apply to incoming rows that do not specify project in its feature set reference.")
String getDefaultFeastProject();

void setDefaultFeastProject(String defaultProject);

@Required
@Description(
"JSON string representation of the FeatureSet that the import job will process, in BZip2 binary format."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public abstract class ProcessAndValidateFeatureRows

public abstract Map<String, FeatureSetProto.FeatureSetSpec> getFeatureSetSpecs();

public abstract String getDefaultProject();

public abstract TupleTag<FeatureRow> getSuccessTag();

public abstract TupleTag<FailedElement> getFailureTag();
Expand All @@ -53,6 +55,8 @@ public abstract static class Builder {
public abstract Builder setFeatureSetSpecs(
Map<String, FeatureSetProto.FeatureSetSpec> featureSets);

public abstract Builder setDefaultProject(String defaultProject);

public abstract Builder setSuccessTag(TupleTag<FeatureRow> successTag);

public abstract Builder setFailureTag(TupleTag<FailedElement> failureTag);
Expand All @@ -69,7 +73,7 @@ public PCollectionTuple expand(PCollection<FeatureRow> input) {
.collect(Collectors.toMap(Pair::getLeft, Pair::getRight));

return input
.apply("ProcessFeatureRows", ParDo.of(new ProcessFeatureRowDoFn()))
.apply("ProcessFeatureRows", ParDo.of(new ProcessFeatureRowDoFn(getDefaultProject())))
.apply(
"ValidateFeatureRows",
ParDo.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,18 @@

public class ProcessFeatureRowDoFn extends DoFn<FeatureRow, FeatureRow> {

private String defaultProject;

public ProcessFeatureRowDoFn(String defaultProject) {
this.defaultProject = defaultProject;
}

@ProcessElement
public void processElement(ProcessContext context) {
FeatureRow featureRow = context.element();
featureRow =
featureRow.toBuilder().setFeatureSet(stripVersion(featureRow.getFeatureSet())).build();
String featureSetId = stripVersion(featureRow.getFeatureSet());
featureSetId = applyDefaultProject(featureSetId);
featureRow = featureRow.toBuilder().setFeatureSet(featureSetId).build();
context.output(featureRow);
}

Expand All @@ -34,4 +41,12 @@ private String stripVersion(String featureSetId) {
String[] split = featureSetId.split(":");
return split[0];
}

private String applyDefaultProject(String featureSetId) {
String[] split = featureSetId.split("/");
if (split.length == 1) {
return defaultProject + "/" + featureSetId;
}
return featureSetId;
}
}
1 change: 1 addition & 0 deletions ingestion/src/test/java/feast/ingestion/ImportJobTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ public void runPipeline_ShouldWriteToRedisCorrectlyGivenValidSpecAndFeatureRow()
});
options.setFeatureSetJson(compressor.compress(spec));
options.setStoreJson(Collections.singletonList(JsonFormat.printer().print(redis)));
options.setDefaultFeastProject("myproject");
options.setProject("");
options.setBlockOnRun(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public void shouldWriteSuccessAndFailureTagsCorrectly() {
.setCoder(ProtoCoder.of(FeatureRow.class))
.apply(
ProcessAndValidateFeatureRows.newBuilder()
.setDefaultProject("myproject")
.setFailureTag(FAILURE_TAG)
.setSuccessTag(SUCCESS_TAG)
.setFeatureSetSpecs(featureSetSpecs)
Expand Down Expand Up @@ -158,6 +159,56 @@ public void shouldStripVersions() {
.setCoder(ProtoCoder.of(FeatureRow.class))
.apply(
ProcessAndValidateFeatureRows.newBuilder()
.setDefaultProject("myproject")
.setFailureTag(FAILURE_TAG)
.setSuccessTag(SUCCESS_TAG)
.setFeatureSetSpecs(featureSetSpecs)
.build());

PAssert.that(output.get(SUCCESS_TAG)).containsInAnyOrder(expected);

p.run();
}

@Test
public void shouldApplyDefaultProject() {
FeatureSetSpec fs1 =
FeatureSetSpec.newBuilder()
.setName("feature_set")
.setProject("myproject")
.addEntities(
EntitySpec.newBuilder()
.setName("entity_id_primary")
.setValueType(Enum.INT32)
.build())
.addEntities(
EntitySpec.newBuilder()
.setName("entity_id_secondary")
.setValueType(Enum.STRING)
.build())
.addFeatures(
FeatureSpec.newBuilder().setName("feature_1").setValueType(Enum.STRING).build())
.addFeatures(
FeatureSpec.newBuilder().setName("feature_2").setValueType(Enum.INT64).build())
.build();

Map<String, FeatureSetSpec> featureSetSpecs = new HashMap<>();
featureSetSpecs.put("myproject/feature_set", fs1);

List<FeatureRow> input = new ArrayList<>();
List<FeatureRow> expected = new ArrayList<>();

FeatureRow randomRow = TestUtil.createRandomFeatureRow(fs1);
expected.add(randomRow);
randomRow = randomRow.toBuilder().setFeatureSet("feature_set").build();
input.add(randomRow);

PCollectionTuple output =
p.apply(Create.of(input))
.setCoder(ProtoCoder.of(FeatureRow.class))
.apply(
ProcessAndValidateFeatureRows.newBuilder()
.setDefaultProject("myproject")
.setFailureTag(FAILURE_TAG)
.setSuccessTag(SUCCESS_TAG)
.setFeatureSetSpecs(featureSetSpecs)
Expand Down Expand Up @@ -212,6 +263,7 @@ public void shouldExcludeUnregisteredFields() {
.setCoder(ProtoCoder.of(FeatureRow.class))
.apply(
ProcessAndValidateFeatureRows.newBuilder()
.setDefaultProject("myproject")
.setFailureTag(FAILURE_TAG)
.setSuccessTag(SUCCESS_TAG)
.setFeatureSetSpecs(featureSets)
Expand Down

0 comments on commit d8459e0

Please sign in to comment.