From 71f2fae01b5e5f1cf80c879892bd154015060765 Mon Sep 17 00:00:00 2001 From: Pradithya Aria Date: Fri, 18 Jan 2019 12:27:36 +0800 Subject: [PATCH] Avoid error thrown when no spec storage for warehouse/serving is registered --- .../transform/SplitOutputByStore.java | 1 - .../transform/SplitOutputByStoreTest.java | 121 ++++++++++++++++++ 2 files changed, 121 insertions(+), 1 deletion(-) diff --git a/ingestion/src/main/java/feast/ingestion/transform/SplitOutputByStore.java b/ingestion/src/main/java/feast/ingestion/transform/SplitOutputByStore.java index 8c123a709f..cb11490dd3 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/SplitOutputByStore.java +++ b/ingestion/src/main/java/feast/ingestion/transform/SplitOutputByStore.java @@ -54,7 +54,6 @@ public class SplitOutputByStore extends PTransform { public PFeatureRows expand(PFeatureRows input) { Map transforms = getFeatureStoreTransforms(); Set keys = transforms.keySet(); - Preconditions.checkArgument(transforms.size() > 0, "no write transforms found"); log.info(String.format("Splitting on keys = [%s]", String.join(",", keys))); MultiOutputSplit splitter = new MultiOutputSplit<>(selector, keys, specs); diff --git a/ingestion/src/test/java/feast/ingestion/transform/SplitOutputByStoreTest.java b/ingestion/src/test/java/feast/ingestion/transform/SplitOutputByStoreTest.java index eade156989..09e8ef5434 100644 --- a/ingestion/src/test/java/feast/ingestion/transform/SplitOutputByStoreTest.java +++ b/ingestion/src/test/java/feast/ingestion/transform/SplitOutputByStoreTest.java @@ -25,6 +25,7 @@ import feast.storage.MockTransforms; import feast.types.FeatureRowExtendedProto.FeatureRowExtended; import feast.types.FeatureRowProto.FeatureRow; +import java.util.Collections; import java.util.List; import java.util.Map; import org.apache.beam.sdk.testing.PAssert; @@ -240,6 +241,126 @@ public void testSplitWhereFeature2HasNoStoreId() { } + @Test + public void testSplitWhereNoStorageSpec() { + // There is no storage spec registered and feature 1 has no storage set + // feature1 should get thrown harmlessly + + SerializableFunction selector = (fs) -> fs.getDataStores().getServing() + .getId(); + MockSpecService specService = new MockSpecService(); + specService.entitySpecs.put("e1", EntitySpec.getDefaultInstance()); + specService.featureSpecs.put( + "f1", FeatureSpec.newBuilder().setEntity("e1") + .build()); + + Specs specs = + Specs.of( + "jobname", + ImportSpec.newBuilder() + .addEntities("e1") + .setSchema( + Schema.newBuilder() + .addAllFields( + Collections.singletonList( + Field.newBuilder().setFeatureId("f1").build()))) + .build(), + specService); + assertNull(specs.getError()); + List stores = Collections.emptyList(); + SplitOutputByStore split = new SplitOutputByStore(stores, selector, specs); + + PCollection input = + pipeline + .apply( + Create.of( + FeatureRow.newBuilder() + .addFeatures(Features.of("f1", Values.ofInt32(1))) + .build())) + .apply(new ToFeatureRowExtended()); + PFeatureRows pfrows = PFeatureRows.of(input); + pfrows = pfrows.apply("do split", split); + + PAssert.that( + pfrows + .getErrors()).empty(); + PAssert.that( + pfrows + .getMain() + .apply( + MapElements.into(TypeDescriptor.of(FeatureRow.class)) + .via(FeatureRowExtended::getRow))) + .containsInAnyOrder( + FeatureRow.newBuilder() + .addFeatures(Features.of("f1", Values.ofInt32(1))) + .setEventTimestamp(Timestamp.getDefaultInstance()) + .build()); + + pipeline.run(); + } + + + @Test + public void testSplitWhereNoStorageSpecForAFeature() { + // There is no storage spec registered but feature 1 has storage set + // feature1 should get thrown harmlessly + + SerializableFunction selector = (fs) -> fs.getDataStores().getServing() + .getId(); + MockSpecService specService = new MockSpecService(); + specService.entitySpecs.put("e1", EntitySpec.getDefaultInstance()); + specService.featureSpecs.put( + "f1", FeatureSpec.newBuilder().setEntity("e1") + .setDataStores( + DataStores.newBuilder().setServing(DataStore.newBuilder().setId("store1") + .build())) + .build()); + + Specs specs = + Specs.of( + "jobname", + ImportSpec.newBuilder() + .addEntities("e1") + .setSchema( + Schema.newBuilder() + .addAllFields( + Collections.singletonList( + Field.newBuilder().setFeatureId("f1").build()))) + .build(), + specService); + assertNull(specs.getError()); + List stores = Collections.emptyList(); + SplitOutputByStore split = new SplitOutputByStore(stores, selector, specs); + + PCollection input = + pipeline + .apply( + Create.of( + FeatureRow.newBuilder() + .addFeatures(Features.of("f1", Values.ofInt32(1))) + .build())) + .apply(new ToFeatureRowExtended()); + PFeatureRows pfrows = PFeatureRows.of(input); + pfrows = pfrows.apply("do split", split); + + PAssert.that( + pfrows + .getErrors()).empty(); + PAssert.that( + pfrows + .getMain() + .apply( + MapElements.into(TypeDescriptor.of(FeatureRow.class)) + .via(FeatureRowExtended::getRow))) + .containsInAnyOrder( + FeatureRow.newBuilder() + .addFeatures(Features.of("f1", Values.ofInt32(1))) + .setEventTimestamp(Timestamp.getDefaultInstance()) + .build()); + + pipeline.run(); + } + @Test public void testWriteTags() { TupleTag tag1 = new TupleTag<>("TAG1");