From d8459e0115ccfabe145ae7a43a5a57b30c8b8922 Mon Sep 17 00:00:00 2001 From: Chen Zhiling Date: Tue, 19 May 2020 08:21:55 +0800 Subject: [PATCH] Apply default project to rows without project during ingestion (#701) * Apply default project to incoming rows without project defined * Apply spotless * Fix broken test --- .../core/job/dataflow/DataflowJobManager.java | 1 + .../job/direct/DirectRunnerJobManager.java | 2 + .../main/java/feast/core/model/Project.java | 1 + .../main/java/feast/ingestion/ImportJob.java | 1 + .../ingestion/options/ImportOptions.java | 7 +++ .../ProcessAndValidateFeatureRows.java | 6 ++- .../transform/fn/ProcessFeatureRowDoFn.java | 19 ++++++- .../java/feast/ingestion/ImportJobTest.java | 1 + .../ProcessAndValidateFeatureRowsTest.java | 52 +++++++++++++++++++ 9 files changed, 87 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java index d53fc7cf6c..7b9df0abd5 100644 --- a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java +++ b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java @@ -280,6 +280,7 @@ private ImportOptions getPipelineOptions( pipelineOptions.setFeatureSetJson(featureSetJsonCompressor.compress(featureSets)); pipelineOptions.setStoreJson(Collections.singletonList(JsonFormat.printer().print(sink))); pipelineOptions.setProject(projectId); + pipelineOptions.setDefaultFeastProject(Project.DEFAULT_NAME); pipelineOptions.setUpdate(update); pipelineOptions.setRunner(DataflowRunner.class); pipelineOptions.setJobName(jobName); diff --git a/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java b/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java index 4bcff01686..2e2b43047e 100644 --- a/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java +++ b/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java @@ -26,6 +26,7 @@ import feast.core.model.FeatureSet; import feast.core.model.Job; import feast.core.model.JobStatus; +import feast.core.model.Project; import feast.core.util.TypeConversion; import feast.ingestion.ImportJob; import feast.ingestion.options.BZip2Compressor; @@ -105,6 +106,7 @@ private ImportOptions getPipelineOptions( pipelineOptions.setJobName(jobName); pipelineOptions.setStoreJson(Collections.singletonList(JsonFormat.printer().print(sink))); pipelineOptions.setRunner(DirectRunner.class); + pipelineOptions.setDefaultFeastProject(Project.DEFAULT_NAME); pipelineOptions.setProject(""); // set to default value to satisfy validation if (metrics.isEnabled()) { pipelineOptions.setMetricsExporterType(metrics.getType()); diff --git a/core/src/main/java/feast/core/model/Project.java b/core/src/main/java/feast/core/model/Project.java index d6e6149394..c55830c824 100644 --- a/core/src/main/java/feast/core/model/Project.java +++ b/core/src/main/java/feast/core/model/Project.java @@ -34,6 +34,7 @@ @Entity @Table(name = "projects") public class Project { + public static final String DEFAULT_NAME = "default"; // Name of the project @Id diff --git a/ingestion/src/main/java/feast/ingestion/ImportJob.java b/ingestion/src/main/java/feast/ingestion/ImportJob.java index 0420ddcabb..16efe11f55 100644 --- a/ingestion/src/main/java/feast/ingestion/ImportJob.java +++ b/ingestion/src/main/java/feast/ingestion/ImportJob.java @@ -130,6 +130,7 @@ public static PipelineResult runPipeline(ImportOptions options) throws IOExcepti .get(FEATURE_ROW_OUT) .apply( ProcessAndValidateFeatureRows.newBuilder() + .setDefaultProject(options.getDefaultFeastProject()) .setFeatureSetSpecs(featureSetSpecsByKey) .setSuccessTag(FEATURE_ROW_OUT) .setFailureTag(DEADLETTER_OUT) diff --git a/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java b/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java index 1fa127d662..e3a1b841c4 100644 --- a/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java +++ b/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java @@ -27,6 +27,13 @@ /** Options passed to Beam to influence the job's execution environment */ public interface ImportOptions extends PipelineOptions, DataflowPipelineOptions, DirectOptions { + @Required + @Description( + "Default feast project to apply to incoming rows that do not specify project in its feature set reference.") + String getDefaultFeastProject(); + + void setDefaultFeastProject(String defaultProject); + @Required @Description( "JSON string representation of the FeatureSet that the import job will process, in BZip2 binary format." diff --git a/ingestion/src/main/java/feast/ingestion/transform/ProcessAndValidateFeatureRows.java b/ingestion/src/main/java/feast/ingestion/transform/ProcessAndValidateFeatureRows.java index 50c3e0ee4f..2ce8918ccf 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/ProcessAndValidateFeatureRows.java +++ b/ingestion/src/main/java/feast/ingestion/transform/ProcessAndValidateFeatureRows.java @@ -39,6 +39,8 @@ public abstract class ProcessAndValidateFeatureRows public abstract Map getFeatureSetSpecs(); + public abstract String getDefaultProject(); + public abstract TupleTag getSuccessTag(); public abstract TupleTag getFailureTag(); @@ -53,6 +55,8 @@ public abstract static class Builder { public abstract Builder setFeatureSetSpecs( Map featureSets); + public abstract Builder setDefaultProject(String defaultProject); + public abstract Builder setSuccessTag(TupleTag successTag); public abstract Builder setFailureTag(TupleTag failureTag); @@ -69,7 +73,7 @@ public PCollectionTuple expand(PCollection input) { .collect(Collectors.toMap(Pair::getLeft, Pair::getRight)); return input - .apply("ProcessFeatureRows", ParDo.of(new ProcessFeatureRowDoFn())) + .apply("ProcessFeatureRows", ParDo.of(new ProcessFeatureRowDoFn(getDefaultProject()))) .apply( "ValidateFeatureRows", ParDo.of( diff --git a/ingestion/src/main/java/feast/ingestion/transform/fn/ProcessFeatureRowDoFn.java b/ingestion/src/main/java/feast/ingestion/transform/fn/ProcessFeatureRowDoFn.java index 2a115a8071..3680348cf0 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/fn/ProcessFeatureRowDoFn.java +++ b/ingestion/src/main/java/feast/ingestion/transform/fn/ProcessFeatureRowDoFn.java @@ -21,11 +21,18 @@ public class ProcessFeatureRowDoFn extends DoFn { + private String defaultProject; + + public ProcessFeatureRowDoFn(String defaultProject) { + this.defaultProject = defaultProject; + } + @ProcessElement public void processElement(ProcessContext context) { FeatureRow featureRow = context.element(); - featureRow = - featureRow.toBuilder().setFeatureSet(stripVersion(featureRow.getFeatureSet())).build(); + String featureSetId = stripVersion(featureRow.getFeatureSet()); + featureSetId = applyDefaultProject(featureSetId); + featureRow = featureRow.toBuilder().setFeatureSet(featureSetId).build(); context.output(featureRow); } @@ -34,4 +41,12 @@ private String stripVersion(String featureSetId) { String[] split = featureSetId.split(":"); return split[0]; } + + private String applyDefaultProject(String featureSetId) { + String[] split = featureSetId.split("/"); + if (split.length == 1) { + return defaultProject + "/" + featureSetId; + } + return featureSetId; + } } diff --git a/ingestion/src/test/java/feast/ingestion/ImportJobTest.java b/ingestion/src/test/java/feast/ingestion/ImportJobTest.java index 16d27303cd..39e4296378 100644 --- a/ingestion/src/test/java/feast/ingestion/ImportJobTest.java +++ b/ingestion/src/test/java/feast/ingestion/ImportJobTest.java @@ -176,6 +176,7 @@ public void runPipeline_ShouldWriteToRedisCorrectlyGivenValidSpecAndFeatureRow() }); options.setFeatureSetJson(compressor.compress(spec)); options.setStoreJson(Collections.singletonList(JsonFormat.printer().print(redis))); + options.setDefaultFeastProject("myproject"); options.setProject(""); options.setBlockOnRun(false); diff --git a/ingestion/src/test/java/feast/ingestion/transform/ProcessAndValidateFeatureRowsTest.java b/ingestion/src/test/java/feast/ingestion/transform/ProcessAndValidateFeatureRowsTest.java index 96105e3789..8c5d7bd8ed 100644 --- a/ingestion/src/test/java/feast/ingestion/transform/ProcessAndValidateFeatureRowsTest.java +++ b/ingestion/src/test/java/feast/ingestion/transform/ProcessAndValidateFeatureRowsTest.java @@ -109,6 +109,7 @@ public void shouldWriteSuccessAndFailureTagsCorrectly() { .setCoder(ProtoCoder.of(FeatureRow.class)) .apply( ProcessAndValidateFeatureRows.newBuilder() + .setDefaultProject("myproject") .setFailureTag(FAILURE_TAG) .setSuccessTag(SUCCESS_TAG) .setFeatureSetSpecs(featureSetSpecs) @@ -158,6 +159,56 @@ public void shouldStripVersions() { .setCoder(ProtoCoder.of(FeatureRow.class)) .apply( ProcessAndValidateFeatureRows.newBuilder() + .setDefaultProject("myproject") + .setFailureTag(FAILURE_TAG) + .setSuccessTag(SUCCESS_TAG) + .setFeatureSetSpecs(featureSetSpecs) + .build()); + + PAssert.that(output.get(SUCCESS_TAG)).containsInAnyOrder(expected); + + p.run(); + } + + @Test + public void shouldApplyDefaultProject() { + FeatureSetSpec fs1 = + FeatureSetSpec.newBuilder() + .setName("feature_set") + .setProject("myproject") + .addEntities( + EntitySpec.newBuilder() + .setName("entity_id_primary") + .setValueType(Enum.INT32) + .build()) + .addEntities( + EntitySpec.newBuilder() + .setName("entity_id_secondary") + .setValueType(Enum.STRING) + .build()) + .addFeatures( + FeatureSpec.newBuilder().setName("feature_1").setValueType(Enum.STRING).build()) + .addFeatures( + FeatureSpec.newBuilder().setName("feature_2").setValueType(Enum.INT64).build()) + .build(); + + Map featureSetSpecs = new HashMap<>(); + featureSetSpecs.put("myproject/feature_set", fs1); + + List input = new ArrayList<>(); + List expected = new ArrayList<>(); + + FeatureRow randomRow = TestUtil.createRandomFeatureRow(fs1); + expected.add(randomRow); + randomRow = randomRow.toBuilder().setFeatureSet("feature_set").build(); + input.add(randomRow); + + PCollectionTuple output = + p.apply(Create.of(input)) + .setCoder(ProtoCoder.of(FeatureRow.class)) + .apply( + ProcessAndValidateFeatureRows.newBuilder() + .setDefaultProject("myproject") .setFailureTag(FAILURE_TAG) .setSuccessTag(SUCCESS_TAG) .setFeatureSetSpecs(featureSetSpecs) @@ -212,6 +263,7 @@ public void shouldExcludeUnregisteredFields() { .setCoder(ProtoCoder.of(FeatureRow.class)) .apply( ProcessAndValidateFeatureRows.newBuilder() + .setDefaultProject("myproject") .setFailureTag(FAILURE_TAG) .setSuccessTag(SUCCESS_TAG) .setFeatureSetSpecs(featureSets)