From 3819c0783da7951263f1d503125f84babb2c95b2 Mon Sep 17 00:00:00 2001 From: Zhu Zhanyan Date: Wed, 29 Jul 2020 17:55:56 +0800 Subject: [PATCH 01/16] Update RedisCustomIO to write FeatureRows with field's name set to hash of field. --- .../redis/writer/RedisCustomIO.java | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisCustomIO.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisCustomIO.java index dcd2e5bfda..04ae3bdb08 100644 --- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisCustomIO.java +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisCustomIO.java @@ -206,25 +206,33 @@ private byte[] getKey(FeatureRow featureRow, FeatureSetSpec spec) { private byte[] getValue(FeatureRow featureRow, FeatureSetSpec spec) { List featureNames = spec.getFeaturesList().stream().map(FeatureSpec::getName).collect(Collectors.toList()); - Map fieldValueOnlyMap = + + Map fieldValueOnlyMap = featureRow.getFieldsList().stream() .filter(field -> featureNames.contains(field.getName())) .distinct() .collect( Collectors.toMap( Field::getName, - field -> Field.newBuilder().setValue(field.getValue()).build())); + field -> Field.newBuilder().setValue(field.getValue()))); List values = featureNames.stream() .sorted() .map( - featureName -> - fieldValueOnlyMap.getOrDefault( + featureName -> { + Field.Builder field = fieldValueOnlyMap.getOrDefault( featureName, Field.newBuilder() - .setValue(ValueProto.Value.getDefaultInstance()) - .build())) + .setValue(ValueProto.Value.getDefaultInstance())); + + // Set the name of the field to the hash of the field name. + // Use hash of name instead of the name of the field to reduce redis + // storage consumption per feature row stored. + field.setName(String.format("%d", featureName.hashCode())); + + return field.build(); + }) .collect(Collectors.toList()); return FeatureRow.newBuilder() From 0ba8fce61a24e7477e82458a1a1d47b7022e44aa Mon Sep 17 00:00:00 2001 From: Zhu Zhanyan Date: Thu, 30 Jul 2020 10:13:28 +0800 Subject: [PATCH 02/16] Update FeatureRowDecoder to decode by name hash instead of order --- .../redis/retriever/FeatureRowDecoder.java | 113 ++++++++++++------ .../RedisClusterOnlineRetriever.java | 16 +-- .../redis/retriever/RedisOnlineRetriever.java | 10 +- .../redis/writer/RedisCustomIO.java | 40 ++++--- .../retriever/FeatureRowDecoderTest.java | 103 ++++++++++++++-- 5 files changed, 205 insertions(+), 77 deletions(-) diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/FeatureRowDecoder.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/FeatureRowDecoder.java index aad3147f71..91849fa73c 100644 --- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/FeatureRowDecoder.java +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/FeatureRowDecoder.java @@ -16,12 +16,17 @@ */ package feast.storage.connectors.redis.retriever; +import com.google.common.hash.Hashing; import feast.proto.core.FeatureSetProto.FeatureSetSpec; import feast.proto.core.FeatureSetProto.FeatureSpec; import feast.proto.types.FeatureRowProto.FeatureRow; import feast.proto.types.FieldProto.Field; +import feast.proto.types.ValueProto.Value; +import feast.storage.connectors.redis.writer.RedisCustomIO; +import java.nio.charset.StandardCharsets; import java.util.Comparator; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -36,60 +41,100 @@ public FeatureRowDecoder(String featureSetRef, FeatureSetSpec spec) { } /** - * A feature row is considered encoded if the feature set and field names are not set. This method - * is required for backward compatibility purposes, to allow Feast serving to continue serving non - * encoded Feature Row ingested by an older version of Feast. + * Check if encoded feature row v1 is encoded. A Feature Row v1 is considered encoded if both it' + * feature set reference and fields names are not set. The no. of fields in the feature row should + * also match up with the number of fields in the Feature Set spec. NOTE: This method is + * deprecated and will be removed in Feast v0.7. * * @param featureRow Feature row * @return boolean */ - public boolean isEncoded(FeatureRow featureRow) { + @Deprecated + private boolean isEncodedV1(FeatureRow featureRow) { return featureRow.getFeatureSet().isEmpty() - && featureRow.getFieldsList().stream().allMatch(field -> field.getName().isEmpty()); + && featureRow.getFieldsList().stream().allMatch(field -> field.getName().isEmpty()) + && featureRow.getFieldsList().size() == spec.getFeaturesList().size(); } /** - * Validates if an encoded feature row can be decoded without exception. + * Check if encoded feature row v2 is encoded. A Feature Row v2 is considered encoded if it's both + * it feature set reference and fields names are set. * * @param featureRow Feature row * @return boolean */ - public boolean isEncodingValid(FeatureRow featureRow) { - return featureRow.getFieldsList().size() == spec.getFeaturesList().size(); + private boolean isEncodedV2(FeatureRow featureRow) { + return featureRow.getFieldsList().stream().allMatch(field -> !field.getName().isEmpty()); } /** - * Decoding feature row by repopulating the field names based on the corresponding feature set - * spec. + * Decode feature row encoded by {@link RedisCustomIO}. NOTE: support for decoding feature row v1 + * will be dropped in Feast 0.7 * + * @throws IllegalArgumentException if unable to the decode the given feature row * @param encodedFeatureRow Feature row * @return boolean */ public FeatureRow decode(FeatureRow encodedFeatureRow) { - final List fieldsWithoutName = encodedFeatureRow.getFieldsList(); + if (isEncodedV1(encodedFeatureRow)) { + // TODO: remove support for v1 feature row in Feast 0.7 + final List fieldsWithoutName = encodedFeatureRow.getFieldsList(); + List featureNames = + spec.getFeaturesList().stream() + .sorted(Comparator.comparing(FeatureSpec::getName)) + .map(FeatureSpec::getName) + .collect(Collectors.toList()); - List featureNames = - spec.getFeaturesList().stream() - .sorted(Comparator.comparing(FeatureSpec::getName)) - .map(FeatureSpec::getName) - .collect(Collectors.toList()); - List fields = - IntStream.range(0, featureNames.size()) - .mapToObj( - featureNameIndex -> { - String featureName = featureNames.get(featureNameIndex); - return fieldsWithoutName - .get(featureNameIndex) - .toBuilder() - .setName(featureName) - .build(); - }) - .collect(Collectors.toList()); - return encodedFeatureRow - .toBuilder() - .clearFields() - .setFeatureSet(featureSetRef) - .addAllFields(fields) - .build(); + List fields = + IntStream.range(0, featureNames.size()) + .mapToObj( + featureNameIndex -> { + String featureName = featureNames.get(featureNameIndex); + return fieldsWithoutName + .get(featureNameIndex) + .toBuilder() + .setName(featureName) + .build(); + }) + .collect(Collectors.toList()); + + return encodedFeatureRow + .toBuilder() + .clearFields() + .setFeatureSet(featureSetRef) + .addAllFields(fields) + .build(); + + } else if (isEncodedV2(encodedFeatureRow)) { + // Encoded Feature Row v2 uses a hashed name as the field name and does not have feature set + // reference set. + // Decoding reverts the field name to a unhashed string and set feature set reference. + Map nameHashValueMap = + encodedFeatureRow.getFieldsList().stream() + .collect(Collectors.toMap(field -> field.getName(), field -> field.getValue())); + + List featureNames = + spec.getFeaturesList().stream().map(FeatureSpec::getName).collect(Collectors.toList()); + + List fields = + featureNames.stream() + .map( + name -> { + String nameHash = + Hashing.murmur3_32().hashString(name, StandardCharsets.UTF_8).toString(); + Value value = + nameHashValueMap.getOrDefault(nameHash, Value.newBuilder().build()); + return Field.newBuilder().setName(name).setValue(value).build(); + }) + .collect(Collectors.toList()); + + return encodedFeatureRow + .toBuilder() + .clearFields() + .setFeatureSet(featureSetRef) + .addAllFields(fields) + .build(); + } + throw new IllegalArgumentException("Failed to decode FeatureRow row: Possible data corruption"); } } diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterOnlineRetriever.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterOnlineRetriever.java index c006149cd5..2146ec2f87 100644 --- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterOnlineRetriever.java +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterOnlineRetriever.java @@ -158,17 +158,11 @@ private List> getFeaturesFromRedis( // decode feature rows from data bytes using decoder. FeatureRow featureRow = FeatureRow.parseFrom(featureRowBytes); - if (decoder.isEncoded(featureRow)) { - if (decoder.isEncodingValid(featureRow)) { - featureRow = decoder.decode(featureRow); - } else { - // decoding feature row failed: data corruption could have occurred - throw Status.DATA_LOSS - .withDescription( - "Failed to decode FeatureRow from bytes retrieved from redis" - + ": Possible data corruption") - .asRuntimeException(); - } + try { + featureRow = decoder.decode(featureRow); + } catch (IllegalArgumentException e) { + // decoding feature row failed: data corruption could have occurred + throw Status.DATA_LOSS.withCause(e).withDescription(e.getMessage()).asRuntimeException(); } featureRows.add(Optional.of(featureRow)); } diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisOnlineRetriever.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisOnlineRetriever.java index ef80b06799..049175879d 100644 --- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisOnlineRetriever.java +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisOnlineRetriever.java @@ -151,15 +151,11 @@ private List> getFeaturesFromRedis( // decode feature rows from data bytes using decoder. FeatureRow featureRow = FeatureRow.parseFrom(featureRowBytes); - if (decoder.isEncoded(featureRow) && decoder.isEncodingValid(featureRow)) { + try { featureRow = decoder.decode(featureRow); - } else { + } catch (IllegalArgumentException e) { // decoding feature row failed: data corruption could have occurred - throw Status.DATA_LOSS - .withDescription( - "Failed to decode FeatureRow from bytes retrieved from redis" - + ": Possible data corruption") - .asRuntimeException(); + throw Status.DATA_LOSS.withCause(e).withDescription(e.getMessage()).asRuntimeException(); } featureRows.add(Optional.of(featureRow)); } diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisCustomIO.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisCustomIO.java index 04ae3bdb08..beb3e20192 100644 --- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisCustomIO.java +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisCustomIO.java @@ -18,6 +18,7 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Lists; +import com.google.common.hash.Hashing; import feast.proto.core.FeatureSetProto.EntitySpec; import feast.proto.core.FeatureSetProto.FeatureSetSpec; import feast.proto.core.FeatureSetProto.FeatureSpec; @@ -29,7 +30,9 @@ import feast.storage.api.writer.FailedElement; import feast.storage.api.writer.WriteResult; import feast.storage.common.retry.Retriable; +import feast.storage.connectors.redis.retriever.FeatureRowDecoder; import io.lettuce.core.RedisException; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -203,35 +206,44 @@ private byte[] getKey(FeatureRow featureRow, FeatureSetSpec spec) { return redisKeyBuilder.build().toByteArray(); } + /** + * Encode the Feature Row as bytes to store in Redis in encoded Feature Row v2 format. + * To reduce storage space consumption in redis, + * feature rows are "encoded" by hashing the fields names and not unsetting the feature + * set reference. {@link FeatureRowDecoder} is rensponsible for reversing this "encoding" step. + */ private byte[] getValue(FeatureRow featureRow, FeatureSetSpec spec) { List featureNames = spec.getFeaturesList().stream().map(FeatureSpec::getName).collect(Collectors.toList()); - + Map fieldValueOnlyMap = featureRow.getFieldsList().stream() .filter(field -> featureNames.contains(field.getName())) .distinct() .collect( Collectors.toMap( - Field::getName, - field -> Field.newBuilder().setValue(field.getValue()))); + Field::getName, field -> Field.newBuilder().setValue(field.getValue()))); List values = featureNames.stream() .sorted() .map( featureName -> { - Field.Builder field = fieldValueOnlyMap.getOrDefault( - featureName, - Field.newBuilder() - .setValue(ValueProto.Value.getDefaultInstance())); - - // Set the name of the field to the hash of the field name. - // Use hash of name instead of the name of the field to reduce redis - // storage consumption per feature row stored. - field.setName(String.format("%d", featureName.hashCode())); - - return field.build(); + Field.Builder field = + fieldValueOnlyMap.getOrDefault( + featureName, + Field.newBuilder().setValue(ValueProto.Value.getDefaultInstance())); + + // Encode the name of the as the hash of the field name. + // Use hash of name instead of the name of to reduce redis storage consumption + // per feature row stored. + String nameHash = + Hashing.murmur3_32() + .hashString(featureName, StandardCharsets.UTF_8) + .toString(); + field.setName(nameHash); + + return field.build(); }) .collect(Collectors.toList()); diff --git a/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/retriever/FeatureRowDecoderTest.java b/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/retriever/FeatureRowDecoderTest.java index 63ad7aa26d..896a196fe2 100644 --- a/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/retriever/FeatureRowDecoderTest.java +++ b/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/retriever/FeatureRowDecoderTest.java @@ -18,6 +18,7 @@ import static org.junit.Assert.*; +import com.google.common.hash.Hashing; import com.google.protobuf.Timestamp; import feast.proto.core.FeatureSetProto; import feast.proto.core.FeatureSetProto.FeatureSetSpec; @@ -25,6 +26,7 @@ import feast.proto.types.FieldProto.Field; import feast.proto.types.ValueProto.Value; import feast.proto.types.ValueProto.ValueType; +import java.nio.charset.StandardCharsets; import java.util.Collections; import org.junit.Test; @@ -48,10 +50,29 @@ public class FeatureRowDecoderTest { .build(); @Test - public void featureRowWithFieldNamesIsNotConsideredAsEncoded() { - + public void shouldDecodeValidEncodedFeatureRowV2() { FeatureRowDecoder decoder = new FeatureRowDecoder("feature_set_ref", spec); - FeatureRowProto.FeatureRow nonEncodedFeatureRow = + + FeatureRowProto.FeatureRow encodedFeatureRow = + FeatureRowProto.FeatureRow.newBuilder() + .setEventTimestamp(Timestamp.newBuilder().setNanos(1000)) + .addFields( + Field.newBuilder() + .setName( + Hashing.murmur3_32() + .hashString("feature1", StandardCharsets.UTF_8) + .toString()) + .setValue(Value.newBuilder().setInt32Val(2))) + .addFields( + Field.newBuilder() + .setName( + Hashing.murmur3_32() + .hashString("feature2", StandardCharsets.UTF_8) + .toString()) + .setValue(Value.newBuilder().setFloatVal(1.0f))) + .build(); + + FeatureRowProto.FeatureRow expectedFeatureRow = FeatureRowProto.FeatureRow.newBuilder() .setFeatureSet("feature_set_ref") .setEventTimestamp(Timestamp.newBuilder().setNanos(1000)) @@ -62,26 +83,88 @@ public void featureRowWithFieldNamesIsNotConsideredAsEncoded() { .setName("feature2") .setValue(Value.newBuilder().setFloatVal(1.0f))) .build(); - assertFalse(decoder.isEncoded(nonEncodedFeatureRow)); + + assertEquals(expectedFeatureRow, decoder.decode(encodedFeatureRow)); } @Test - public void encodingIsInvalidIfNumberOfFeaturesInSpecDiffersFromFeatureRow() { - + public void shouldDecodeValidFeatureRowV2WithIncompleteFields() { FeatureRowDecoder decoder = new FeatureRowDecoder("feature_set_ref", spec); FeatureRowProto.FeatureRow encodedFeatureRow = FeatureRowProto.FeatureRow.newBuilder() .setEventTimestamp(Timestamp.newBuilder().setNanos(1000)) - .addFields(Field.newBuilder().setValue(Value.newBuilder().setInt32Val(2))) + .addFields( + Field.newBuilder() + .setName( + Hashing.murmur3_32() + .hashString("feature1", StandardCharsets.UTF_8) + .toString()) + .setValue(Value.newBuilder().setInt32Val(2))) + .build(); + + // should decode missing fields as fields with unset value. + FeatureRowProto.FeatureRow expectedFeatureRow = + FeatureRowProto.FeatureRow.newBuilder() + .setFeatureSet("feature_set_ref") + .setEventTimestamp(Timestamp.newBuilder().setNanos(1000)) + .addFields( + Field.newBuilder().setName("feature1").setValue(Value.newBuilder().setInt32Val(2))) + .addFields(Field.newBuilder().setName("feature2").setValue(Value.newBuilder().build())) .build(); - assertFalse(decoder.isEncodingValid(encodedFeatureRow)); + assertEquals(expectedFeatureRow, decoder.decode(encodedFeatureRow)); } @Test - public void shouldDecodeValidEncodedFeatureRow() { + public void shouldDecodeValidFeatureRowV2AndIgnoreExtraFields() { + FeatureRowDecoder decoder = new FeatureRowDecoder("feature_set_ref", spec); + FeatureRowProto.FeatureRow encodedFeatureRow = + FeatureRowProto.FeatureRow.newBuilder() + .setEventTimestamp(Timestamp.newBuilder().setNanos(1000)) + .addFields( + Field.newBuilder() + .setName( + Hashing.murmur3_32() + .hashString("feature1", StandardCharsets.UTF_8) + .toString()) + .setValue(Value.newBuilder().setInt32Val(2))) + .addFields( + Field.newBuilder() + .setName( + Hashing.murmur3_32() + .hashString("feature2", StandardCharsets.UTF_8) + .toString()) + .setValue(Value.newBuilder().setFloatVal(1.0f))) + .addFields( + Field.newBuilder() + .setName( + Hashing.murmur3_32() + .hashString("feature3", StandardCharsets.UTF_8) + .toString()) + .setValue(Value.newBuilder().setStringVal("data"))) + .build(); + + // should decode missing fields as fields with unset value. + FeatureRowProto.FeatureRow expectedFeatureRow = + FeatureRowProto.FeatureRow.newBuilder() + .setFeatureSet("feature_set_ref") + .setEventTimestamp(Timestamp.newBuilder().setNanos(1000)) + .addFields( + Field.newBuilder().setName("feature1").setValue(Value.newBuilder().setInt32Val(2))) + .addFields( + Field.newBuilder() + .setName("feature2") + .setValue(Value.newBuilder().setFloatVal(1.0f))) + .build(); + + assertEquals(expectedFeatureRow, decoder.decode(encodedFeatureRow)); + } + + // TODO: remove this test in Feast 0.7 when support for Feature Row v1 is removed + @Test + public void shouldDecodeValidEncodedFeatureRowV1() { FeatureRowDecoder decoder = new FeatureRowDecoder("feature_set_ref", spec); FeatureRowProto.FeatureRow encodedFeatureRow = @@ -103,8 +186,6 @@ public void shouldDecodeValidEncodedFeatureRow() { .setValue(Value.newBuilder().setFloatVal(1.0f))) .build(); - assertTrue(decoder.isEncoded(encodedFeatureRow)); - assertTrue(decoder.isEncodingValid(encodedFeatureRow)); assertEquals(expectedFeatureRow, decoder.decode(encodedFeatureRow)); } } From 4803ea599cdd14a7780129fb46f2b768a494adab Mon Sep 17 00:00:00 2001 From: Zhu Zhanyan Date: Thu, 30 Jul 2020 12:30:06 +0800 Subject: [PATCH 03/16] Bump pytest order numbers by 2 to make space for new tests --- tests/e2e/redis/basic-ingest-redis-serving.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/e2e/redis/basic-ingest-redis-serving.py b/tests/e2e/redis/basic-ingest-redis-serving.py index 341c789f76..63d5012c63 100644 --- a/tests/e2e/redis/basic-ingest-redis-serving.py +++ b/tests/e2e/redis/basic-ingest-redis-serving.py @@ -370,7 +370,7 @@ def list_entity_dataframe(): @pytest.mark.timeout(600) -@pytest.mark.run(order=14) +@pytest.mark.run(order=16) def test_basic_retrieve_online_entity_nonlistform( client, nonlist_entity_dataframe, list_entity_dataframe ): @@ -473,7 +473,7 @@ def try_get_features2(): @pytest.mark.timeout(600) -@pytest.mark.run(order=15) +@pytest.mark.run(order=17) def test_basic_retrieve_online_entity_listform(client, list_entity_dataframe): # Case 1: Features retrieval with entity in list format check district_fs = FeatureSet( @@ -570,7 +570,7 @@ def try_get_features2(): @pytest.mark.timeout(600) -@pytest.mark.run(order=16) +@pytest.mark.run(order=18) def test_basic_ingest_retrieval_fs(client): # Set to another project to test ingestion based on current project context client.set_project(PROJECT_NAME + "_NS1") @@ -623,7 +623,7 @@ def try_get_features(): @pytest.mark.timeout(600) -@pytest.mark.run(order=17) +@pytest.mark.run(order=19) def test_basic_ingest_retrieval_str(client): # Set to another project to test ingestion based on current project context client.set_project(PROJECT_NAME + "_NS1") From feb2438258baeccb4a5028e78679be72390bf344 Mon Sep 17 00:00:00 2001 From: Zhu Zhanyan Date: Thu, 30 Jul 2020 13:27:12 +0800 Subject: [PATCH 04/16] Revert "Bump pytest order numbers by 2 to make space for new tests" This reverts commit aecc9a6e9a70be3fd84d04f81442b518be01a4c6. --- tests/e2e/redis/basic-ingest-redis-serving.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/e2e/redis/basic-ingest-redis-serving.py b/tests/e2e/redis/basic-ingest-redis-serving.py index 63d5012c63..341c789f76 100644 --- a/tests/e2e/redis/basic-ingest-redis-serving.py +++ b/tests/e2e/redis/basic-ingest-redis-serving.py @@ -370,7 +370,7 @@ def list_entity_dataframe(): @pytest.mark.timeout(600) -@pytest.mark.run(order=16) +@pytest.mark.run(order=14) def test_basic_retrieve_online_entity_nonlistform( client, nonlist_entity_dataframe, list_entity_dataframe ): @@ -473,7 +473,7 @@ def try_get_features2(): @pytest.mark.timeout(600) -@pytest.mark.run(order=17) +@pytest.mark.run(order=15) def test_basic_retrieve_online_entity_listform(client, list_entity_dataframe): # Case 1: Features retrieval with entity in list format check district_fs = FeatureSet( @@ -570,7 +570,7 @@ def try_get_features2(): @pytest.mark.timeout(600) -@pytest.mark.run(order=18) +@pytest.mark.run(order=16) def test_basic_ingest_retrieval_fs(client): # Set to another project to test ingestion based on current project context client.set_project(PROJECT_NAME + "_NS1") @@ -623,7 +623,7 @@ def try_get_features(): @pytest.mark.timeout(600) -@pytest.mark.run(order=19) +@pytest.mark.run(order=17) def test_basic_ingest_retrieval_str(client): # Set to another project to test ingestion based on current project context client.set_project(PROJECT_NAME + "_NS1") From 3491a3ab6bebe3758f2711fc0e80a065155e333c Mon Sep 17 00:00:00 2001 From: Zhu Zhanyan Date: Thu, 30 Jul 2020 14:42:19 +0800 Subject: [PATCH 05/16] Added e2e to check that feature rows with missing or extra fields can be retrieved --- .../redis/writer/RedisCustomIO.java | 8 +- tests/e2e/redis/basic-ingest-redis-serving.py | 84 +++++++++++++++++++ 2 files changed, 88 insertions(+), 4 deletions(-) diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisCustomIO.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisCustomIO.java index beb3e20192..3c10299dfa 100644 --- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisCustomIO.java +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisCustomIO.java @@ -207,10 +207,10 @@ private byte[] getKey(FeatureRow featureRow, FeatureSetSpec spec) { } /** - * Encode the Feature Row as bytes to store in Redis in encoded Feature Row v2 format. - * To reduce storage space consumption in redis, - * feature rows are "encoded" by hashing the fields names and not unsetting the feature - * set reference. {@link FeatureRowDecoder} is rensponsible for reversing this "encoding" step. + * Encode the Feature Row as bytes to store in Redis in encoded Feature Row v2 format. To + * reduce storage space consumption in redis, feature rows are "encoded" by hashing the fields + * names and not unsetting the feature set reference. {@link FeatureRowDecoder} is + * rensponsible for reversing this "encoding" step. */ private byte[] getValue(FeatureRow featureRow, FeatureSetSpec spec) { List featureNames = diff --git a/tests/e2e/redis/basic-ingest-redis-serving.py b/tests/e2e/redis/basic-ingest-redis-serving.py index 341c789f76..046b638f88 100644 --- a/tests/e2e/redis/basic-ingest-redis-serving.py +++ b/tests/e2e/redis/basic-ingest-redis-serving.py @@ -675,6 +675,90 @@ def try_get_features(): assert online_features_actual.to_dict() == online_features_expected +@pytest.mark.timeout(600) +@pytest.mark.run(order=18) +def test_basic_retrieve_feature_row_missing_fields(client, cust_trans_df): + # reset the project to default + client.set_project() + + feature_refs = ["daily_transactions", "total_transactions", "null_values"] + # update cust_trans_fs with one additional feature. + # feature rows ingested before the feature set update will be missing a field. + old_cust_trans_fs = client.get_feature_set(name="customer_transactions") + new_cust_trans_fs = client.get_feature_set(name="customer_transactions") + new_cust_trans_fs.add(Feature("n_trips", ValueType.INT64)) + client.apply(new_cust_trans_fs) + # sleep to ensure feature set is propagated + time.sleep(15) + + # attempt to retrieve features from feature rows with missing fields + def try_get_features(): + response = client.get_online_features( + entity_rows=[ + {"customer_id": np.int64(cust_trans_df.iloc[0]["customer_id"])} + ], + feature_refs=feature_refs + ["n_trips"], + ) # type: GetOnlineFeaturesResponse + # check if the ingested fields can be correctly retrieved. + is_ok = all( + [ + check_online_response(ref, cust_trans_df, response) + for ref in feature_refs + ] + ) + # should return null_value status for missing field n_trips + is_missing_ok = ( + response.field_values[0].statuses["n_trips"] + == GetOnlineFeaturesResponse.FieldStatus.NULL_VALUE + ) + return response, is_ok and is_missing_ok + + wait_retry_backoff( + retry_fn=try_get_features, + timeout_secs=90, + timeout_msg="Timed out trying to get online feature values", + ) + + +@pytest.mark.timeout(600) +@pytest.mark.run(order=19) +def test_basic_retrieve_feature_row_extra_fields(client, cust_trans_df): + # reset the project to default + client.set_project() + + feature_refs = ["daily_transactions", "total_transactions"] + # update cust_trans_fs with the null_values feature dropped. + # feature rows ingested before the feature set update will have an extra field. + new_cust_trans_fs = client.get_feature_set(name="customer_transactions") + new_cust_trans_fs.drop("null_values") + client.apply(new_cust_trans_fs) + # sleep to ensure feature set update is propagated + time.sleep(15) + + # attempt to retrieve features from feature rows with extra fields + def try_get_features(): + response = client.get_online_features( + entity_rows=[ + {"customer_id": np.int64(cust_trans_df.iloc[0]["customer_id"])} + ], + feature_refs=feature_refs, + ) # type: GetOnlineFeaturesResponse + # check if the non dropped fields can be correctly retrieved. + is_ok = all( + [ + check_online_response(ref, cust_trans_df, response) + for ref in feature_refs + ] + ) + return response, is_ok + + wait_retry_backoff( + retry_fn=try_get_features, + timeout_secs=90, + timeout_msg="Timed out trying to get online feature values", + ) + + @pytest.fixture(scope="module") def all_types_dataframe(): return pd.DataFrame( From d2dcfb2bb3ebe34fa4f3402d912057be347a0c83 Mon Sep 17 00:00:00 2001 From: Zhu Zhanyan Date: Thu, 30 Jul 2020 15:45:20 +0800 Subject: [PATCH 06/16] Clarify docs about Feature Row v1 encoding and Feature Row v2 encoding --- .../redis/retriever/FeatureRowDecoder.java | 18 +++++++++--------- .../connectors/redis/writer/RedisCustomIO.java | 2 +- .../redis/retriever/FeatureRowDecoderTest.java | 2 +- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/FeatureRowDecoder.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/FeatureRowDecoder.java index 91849fa73c..1be5a0400e 100644 --- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/FeatureRowDecoder.java +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/FeatureRowDecoder.java @@ -41,10 +41,10 @@ public FeatureRowDecoder(String featureSetRef, FeatureSetSpec spec) { } /** - * Check if encoded feature row v1 is encoded. A Feature Row v1 is considered encoded if both it' - * feature set reference and fields names are not set. The no. of fields in the feature row should - * also match up with the number of fields in the Feature Set spec. NOTE: This method is - * deprecated and will be removed in Feast v0.7. + * Check if encoded feature row v1 is encoded. The Feature Row v1 encoding defines that a Feature + * Row is is considered encoded if both it's feature set reference and fields names are not set . + * The no. of fields in the feature row should also match up with the number of fields in the + * Feature Set spec. NOTE: This method is deprecated and will be removed in Feast v0.7. * * @param featureRow Feature row * @return boolean @@ -57,8 +57,8 @@ private boolean isEncodedV1(FeatureRow featureRow) { } /** - * Check if encoded feature row v2 is encoded. A Feature Row v2 is considered encoded if it's both - * it feature set reference and fields names are set. + * Check if encoded feature row v2 is encoded. The Feature Row v2 encoding defines that a Feature + * Row is f is considered encoded if it's both it feature set reference and fields names are set. * * @param featureRow Feature row * @return boolean @@ -68,8 +68,8 @@ private boolean isEncodedV2(FeatureRow featureRow) { } /** - * Decode feature row encoded by {@link RedisCustomIO}. NOTE: support for decoding feature row v1 - * will be dropped in Feast 0.7 + * Decode feature row encoded by {@link RedisCustomIO}. NOTE: support for decoding Feature Row v1 + * encoding will be dropped in Feast 0.7 * * @throws IllegalArgumentException if unable to the decode the given feature row * @param encodedFeatureRow Feature row @@ -106,7 +106,7 @@ public FeatureRow decode(FeatureRow encodedFeatureRow) { .build(); } else if (isEncodedV2(encodedFeatureRow)) { - // Encoded Feature Row v2 uses a hashed name as the field name and does not have feature set + // Feature Row v2 encoding uses a hashed name as the field name and does not have feature set // reference set. // Decoding reverts the field name to a unhashed string and set feature set reference. Map nameHashValueMap = diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisCustomIO.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisCustomIO.java index 3c10299dfa..53dcba27e9 100644 --- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisCustomIO.java +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisCustomIO.java @@ -207,7 +207,7 @@ private byte[] getKey(FeatureRow featureRow, FeatureSetSpec spec) { } /** - * Encode the Feature Row as bytes to store in Redis in encoded Feature Row v2 format. To + * Encode the Feature Row as bytes to store in Redis in encoded Feature Row v2 encoding. To * reduce storage space consumption in redis, feature rows are "encoded" by hashing the fields * names and not unsetting the feature set reference. {@link FeatureRowDecoder} is * rensponsible for reversing this "encoding" step. diff --git a/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/retriever/FeatureRowDecoderTest.java b/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/retriever/FeatureRowDecoderTest.java index 896a196fe2..c843d31127 100644 --- a/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/retriever/FeatureRowDecoderTest.java +++ b/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/retriever/FeatureRowDecoderTest.java @@ -162,7 +162,7 @@ public void shouldDecodeValidFeatureRowV2AndIgnoreExtraFields() { assertEquals(expectedFeatureRow, decoder.decode(encodedFeatureRow)); } - // TODO: remove this test in Feast 0.7 when support for Feature Row v1 is removed + // TODO: remove this test in Feast 0.7 when support for Feature Row v1 encoding is removed @Test public void shouldDecodeValidEncodedFeatureRowV1() { FeatureRowDecoder decoder = new FeatureRowDecoder("feature_set_ref", spec); From 049b73fdc50729916dcf39a9f288811cfbc03e2a Mon Sep 17 00:00:00 2001 From: Zhu Zhanyan Date: Thu, 30 Jul 2020 16:03:37 +0800 Subject: [PATCH 07/16] Fix python lint --- tests/e2e/redis/basic-ingest-redis-serving.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/e2e/redis/basic-ingest-redis-serving.py b/tests/e2e/redis/basic-ingest-redis-serving.py index 046b638f88..8197601a5d 100644 --- a/tests/e2e/redis/basic-ingest-redis-serving.py +++ b/tests/e2e/redis/basic-ingest-redis-serving.py @@ -684,7 +684,6 @@ def test_basic_retrieve_feature_row_missing_fields(client, cust_trans_df): feature_refs = ["daily_transactions", "total_transactions", "null_values"] # update cust_trans_fs with one additional feature. # feature rows ingested before the feature set update will be missing a field. - old_cust_trans_fs = client.get_feature_set(name="customer_transactions") new_cust_trans_fs = client.get_feature_set(name="customer_transactions") new_cust_trans_fs.add(Feature("n_trips", ValueType.INT64)) client.apply(new_cust_trans_fs) From 1d97462325ff12acc0a7dccd2cee8f134e14d996 Mon Sep 17 00:00:00 2001 From: Zhu Zhanyan Date: Thu, 30 Jul 2020 16:12:09 +0800 Subject: [PATCH 08/16] Update FeatureRowDecoder's isEncodedV2 check to use anyMatch() --- .../storage/connectors/redis/retriever/FeatureRowDecoder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/FeatureRowDecoder.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/FeatureRowDecoder.java index 1be5a0400e..c44e963610 100644 --- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/FeatureRowDecoder.java +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/FeatureRowDecoder.java @@ -64,7 +64,7 @@ private boolean isEncodedV1(FeatureRow featureRow) { * @return boolean */ private boolean isEncodedV2(FeatureRow featureRow) { - return featureRow.getFieldsList().stream().allMatch(field -> !field.getName().isEmpty()); + return featureRow.getFieldsList().stream().anyMatch(field -> field.getName().isEmpty()); } /** From 5eb13dbc02f81670fb59366b9a9139a97aa5f4cc Mon Sep 17 00:00:00 2001 From: Zhu Zhanyan Date: Thu, 30 Jul 2020 17:05:27 +0800 Subject: [PATCH 09/16] Make missing field/extra field e2e tests independent of other tests. --- tests/e2e/redis/basic-ingest-redis-serving.py | 25 +++++++++++++------ 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/tests/e2e/redis/basic-ingest-redis-serving.py b/tests/e2e/redis/basic-ingest-redis-serving.py index 8197601a5d..3c69b64831 100644 --- a/tests/e2e/redis/basic-ingest-redis-serving.py +++ b/tests/e2e/redis/basic-ingest-redis-serving.py @@ -678,16 +678,22 @@ def try_get_features(): @pytest.mark.timeout(600) @pytest.mark.run(order=18) def test_basic_retrieve_feature_row_missing_fields(client, cust_trans_df): - # reset the project to default - client.set_project() - feature_refs = ["daily_transactions", "total_transactions", "null_values"] + + # apply cust_trans_fs and ingest dataframe + client.set_project(PROJECT_NAME + "_basic_retrieve_missing_fields") + old_cust_trans_fs = FeatureSet.from_yaml( + f"{DIR_PATH}/basic/cust_trans_fs.yaml" + ) + client.apply(old_cust_trans_fs) + client.ingest(old_cust_trans_fs, cust_trans_df) + # update cust_trans_fs with one additional feature. # feature rows ingested before the feature set update will be missing a field. new_cust_trans_fs = client.get_feature_set(name="customer_transactions") new_cust_trans_fs.add(Feature("n_trips", ValueType.INT64)) client.apply(new_cust_trans_fs) - # sleep to ensure feature set is propagated + # sleep to ensure feature set update is propagated time.sleep(15) # attempt to retrieve features from feature rows with missing fields @@ -722,10 +728,15 @@ def try_get_features(): @pytest.mark.timeout(600) @pytest.mark.run(order=19) def test_basic_retrieve_feature_row_extra_fields(client, cust_trans_df): - # reset the project to default - client.set_project() - feature_refs = ["daily_transactions", "total_transactions"] + # apply cust_trans_fs and ingest dataframe + client.set_project(PROJECT_NAME + "_basic_retrieve_missing_fields") + old_cust_trans_fs = FeatureSet.from_yaml( + f"{DIR_PATH}/basic/cust_trans_fs.yaml" + ) + client.apply(old_cust_trans_fs) + client.ingest(old_cust_trans_fs, cust_trans_df) + # update cust_trans_fs with the null_values feature dropped. # feature rows ingested before the feature set update will have an extra field. new_cust_trans_fs = client.get_feature_set(name="customer_transactions") From 80f5b85200a4eb1c17a42282439fa206a941516a Mon Sep 17 00:00:00 2001 From: Zhu Zhanyan Date: Thu, 30 Jul 2020 17:10:04 +0800 Subject: [PATCH 10/16] Update FeatureRowDecoder if/else statement into 2 ifs --- .../storage/connectors/redis/retriever/FeatureRowDecoder.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/FeatureRowDecoder.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/FeatureRowDecoder.java index c44e963610..30f1ef864a 100644 --- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/FeatureRowDecoder.java +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/FeatureRowDecoder.java @@ -105,7 +105,8 @@ public FeatureRow decode(FeatureRow encodedFeatureRow) { .addAllFields(fields) .build(); - } else if (isEncodedV2(encodedFeatureRow)) { + } + if (isEncodedV2(encodedFeatureRow)) { // Feature Row v2 encoding uses a hashed name as the field name and does not have feature set // reference set. // Decoding reverts the field name to a unhashed string and set feature set reference. From dbd07447585893ab5dc131e025942e91d5757c1b Mon Sep 17 00:00:00 2001 From: Zhu Zhanyan Date: Thu, 30 Jul 2020 17:15:19 +0800 Subject: [PATCH 11/16] Fix python and java lint --- .../connectors/redis/retriever/FeatureRowDecoder.java | 11 +++++------ tests/e2e/redis/basic-ingest-redis-serving.py | 8 ++------ 2 files changed, 7 insertions(+), 12 deletions(-) diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/FeatureRowDecoder.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/FeatureRowDecoder.java index 30f1ef864a..d487636aa9 100644 --- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/FeatureRowDecoder.java +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/FeatureRowDecoder.java @@ -42,9 +42,9 @@ public FeatureRowDecoder(String featureSetRef, FeatureSetSpec spec) { /** * Check if encoded feature row v1 is encoded. The Feature Row v1 encoding defines that a Feature - * Row is is considered encoded if both it's feature set reference and fields names are not set . - * The no. of fields in the feature row should also match up with the number of fields in the - * Feature Set spec. NOTE: This method is deprecated and will be removed in Feast v0.7. + * Row is considered encoded if both it's feature set reference and fields names are not set . The + * no. of fields in the feature row should also match up with the number of fields in the Feature + * Set spec. NOTE: This method is deprecated and will be removed in Feast v0.7. * * @param featureRow Feature row * @return boolean @@ -58,13 +58,13 @@ private boolean isEncodedV1(FeatureRow featureRow) { /** * Check if encoded feature row v2 is encoded. The Feature Row v2 encoding defines that a Feature - * Row is f is considered encoded if it's both it feature set reference and fields names are set. + * Row is considered encoded if it's both it feature set reference and fields names are set. * * @param featureRow Feature row * @return boolean */ private boolean isEncodedV2(FeatureRow featureRow) { - return featureRow.getFieldsList().stream().anyMatch(field -> field.getName().isEmpty()); + return !featureRow.getFieldsList().stream().anyMatch(field -> field.getName().isEmpty()); } /** @@ -104,7 +104,6 @@ public FeatureRow decode(FeatureRow encodedFeatureRow) { .setFeatureSet(featureSetRef) .addAllFields(fields) .build(); - } if (isEncodedV2(encodedFeatureRow)) { // Feature Row v2 encoding uses a hashed name as the field name and does not have feature set diff --git a/tests/e2e/redis/basic-ingest-redis-serving.py b/tests/e2e/redis/basic-ingest-redis-serving.py index 3c69b64831..1fcae69ed3 100644 --- a/tests/e2e/redis/basic-ingest-redis-serving.py +++ b/tests/e2e/redis/basic-ingest-redis-serving.py @@ -682,9 +682,7 @@ def test_basic_retrieve_feature_row_missing_fields(client, cust_trans_df): # apply cust_trans_fs and ingest dataframe client.set_project(PROJECT_NAME + "_basic_retrieve_missing_fields") - old_cust_trans_fs = FeatureSet.from_yaml( - f"{DIR_PATH}/basic/cust_trans_fs.yaml" - ) + old_cust_trans_fs = FeatureSet.from_yaml(f"{DIR_PATH}/basic/cust_trans_fs.yaml") client.apply(old_cust_trans_fs) client.ingest(old_cust_trans_fs, cust_trans_df) @@ -731,9 +729,7 @@ def test_basic_retrieve_feature_row_extra_fields(client, cust_trans_df): feature_refs = ["daily_transactions", "total_transactions"] # apply cust_trans_fs and ingest dataframe client.set_project(PROJECT_NAME + "_basic_retrieve_missing_fields") - old_cust_trans_fs = FeatureSet.from_yaml( - f"{DIR_PATH}/basic/cust_trans_fs.yaml" - ) + old_cust_trans_fs = FeatureSet.from_yaml(f"{DIR_PATH}/basic/cust_trans_fs.yaml") client.apply(old_cust_trans_fs) client.ingest(old_cust_trans_fs, cust_trans_df) From 25af940d305016797f64f81ae1b6c0b1c58a66ae Mon Sep 17 00:00:00 2001 From: Zhu Zhanyan Date: Thu, 30 Jul 2020 18:31:48 +0800 Subject: [PATCH 12/16] Fix java unit test failures --- .../writer/RedisClusterFeatureSinkTest.java | 59 +++++++++++++---- .../redis/writer/RedisFeatureSinkTest.java | 64 +++++++++++++++---- 2 files changed, 100 insertions(+), 23 deletions(-) diff --git a/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/writer/RedisClusterFeatureSinkTest.java b/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/writer/RedisClusterFeatureSinkTest.java index 2adf0cec47..930d5ae7c4 100644 --- a/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/writer/RedisClusterFeatureSinkTest.java +++ b/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/writer/RedisClusterFeatureSinkTest.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.hash.Hashing; import com.google.protobuf.Timestamp; import feast.common.models.FeatureSetReference; import feast.proto.core.FeatureSetProto.EntitySpec; @@ -40,6 +41,7 @@ import io.lettuce.core.codec.ByteArrayCodec; import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.nio.file.Paths; import java.util.*; import java.util.concurrent.ScheduledFuture; @@ -160,7 +162,10 @@ public void shouldWriteToRedis() { .build(), FeatureRow.newBuilder() .setEventTimestamp(Timestamp.getDefaultInstance()) - .addFields(Field.newBuilder().setValue(Value.newBuilder().setStringVal("one"))) + .addFields( + Field.newBuilder() + .setName(hash("feature")) + .setValue(Value.newBuilder().setStringVal("one"))) .build()); kvs.put( RedisKey.newBuilder() @@ -169,7 +174,10 @@ public void shouldWriteToRedis() { .build(), FeatureRow.newBuilder() .setEventTimestamp(Timestamp.getDefaultInstance()) - .addFields(Field.newBuilder().setValue(Value.newBuilder().setStringVal("two"))) + .addFields( + Field.newBuilder() + .setName(hash("feature")) + .setValue(Value.newBuilder().setStringVal("two"))) .build()); List featureRows = @@ -205,7 +213,10 @@ public void shouldRetryFailConnection() throws InterruptedException { .build(), FeatureRow.newBuilder() .setEventTimestamp(Timestamp.getDefaultInstance()) - .addFields(Field.newBuilder().setValue(Value.newBuilder().setStringVal("one"))) + .addFields( + Field.newBuilder() + .setName(hash("feature")) + .setValue(Value.newBuilder().setStringVal("one"))) .build()); List featureRows = @@ -332,8 +343,14 @@ public void shouldConvertRowWithDuplicateEntitiesToValidKey() { FeatureRow expectedValue = FeatureRow.newBuilder() .setEventTimestamp(Timestamp.newBuilder().setSeconds(10)) - .addFields(Field.newBuilder().setValue(Value.newBuilder().setStringVal("strValue1"))) - .addFields(Field.newBuilder().setValue(Value.newBuilder().setInt64Val(1001))) + .addFields( + Field.newBuilder() + .setName(hash("feature_1")) + .setValue(Value.newBuilder().setStringVal("strValue1"))) + .addFields( + Field.newBuilder() + .setName(hash("feature_2")) + .setValue(Value.newBuilder().setInt64Val(1001))) .build(); p.apply(Create.of(offendingRow)).apply(redisClusterFeatureSink.writer()); @@ -383,8 +400,14 @@ public void shouldConvertRowWithOutOfOrderFieldsToValidKey() { List expectedFields = Arrays.asList( - Field.newBuilder().setValue(Value.newBuilder().setStringVal("strValue1")).build(), - Field.newBuilder().setValue(Value.newBuilder().setInt64Val(1001)).build()); + Field.newBuilder() + .setName(hash("feature_1")) + .setValue(Value.newBuilder().setStringVal("strValue1")) + .build(), + Field.newBuilder() + .setName(hash("feature_2")) + .setValue(Value.newBuilder().setInt64Val(1001)) + .build()); FeatureRow expectedValue = FeatureRow.newBuilder() .setEventTimestamp(Timestamp.newBuilder().setSeconds(10)) @@ -443,8 +466,14 @@ public void shouldMergeDuplicateFeatureFields() { FeatureRow expectedValue = FeatureRow.newBuilder() .setEventTimestamp(Timestamp.newBuilder().setSeconds(10)) - .addFields(Field.newBuilder().setValue(Value.newBuilder().setStringVal("strValue1"))) - .addFields(Field.newBuilder().setValue(Value.newBuilder().setInt64Val(1001))) + .addFields( + Field.newBuilder() + .setName(hash("feature_1")) + .setValue(Value.newBuilder().setStringVal("strValue1"))) + .addFields( + Field.newBuilder() + .setName(hash("feature_2")) + .setValue(Value.newBuilder().setInt64Val(1001))) .build(); p.apply(Create.of(featureRowWithDuplicatedFeatureFields)) @@ -492,8 +521,12 @@ public void shouldPopulateMissingFeatureValuesWithDefaultInstance() { FeatureRow expectedValue = FeatureRow.newBuilder() .setEventTimestamp(Timestamp.newBuilder().setSeconds(10)) - .addFields(Field.newBuilder().setValue(Value.newBuilder().setStringVal("strValue1"))) - .addFields(Field.newBuilder().setValue(Value.getDefaultInstance())) + .addFields( + Field.newBuilder() + .setName(hash("feature_1")) + .setValue(Value.newBuilder().setStringVal("strValue1"))) + .addFields( + Field.newBuilder().setName(hash("feature_2")).setValue(Value.getDefaultInstance())) .build(); p.apply(Create.of(featureRowWithDuplicatedFeatureFields)) @@ -504,4 +537,8 @@ public void shouldPopulateMissingFeatureValuesWithDefaultInstance() { byte[] actual = redisClusterCommands.get(expectedKey.toByteArray()); assertThat(actual, equalTo(expectedValue.toByteArray())); } + + private static String hash(String input) { + return Hashing.murmur3_32().hashString(input, StandardCharsets.UTF_8).toString(); + } } diff --git a/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/writer/RedisFeatureSinkTest.java b/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/writer/RedisFeatureSinkTest.java index 63ec136c5d..08c1833854 100644 --- a/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/writer/RedisFeatureSinkTest.java +++ b/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/writer/RedisFeatureSinkTest.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.hash.Hashing; import com.google.protobuf.Timestamp; import feast.common.models.FeatureSetReference; import feast.proto.core.FeatureSetProto.EntitySpec; @@ -40,6 +41,7 @@ import io.lettuce.core.api.sync.RedisStringCommands; import io.lettuce.core.codec.ByteArrayCodec; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -134,7 +136,10 @@ public void shouldWriteToRedis() { .build(), FeatureRow.newBuilder() .setEventTimestamp(Timestamp.getDefaultInstance()) - .addFields(Field.newBuilder().setValue(Value.newBuilder().setStringVal("one"))) + .addFields( + Field.newBuilder() + .setName(hash("feature")) + .setValue(Value.newBuilder().setStringVal("one"))) .build()); kvs.put( RedisKey.newBuilder() @@ -143,7 +148,10 @@ public void shouldWriteToRedis() { .build(), FeatureRow.newBuilder() .setEventTimestamp(Timestamp.getDefaultInstance()) - .addFields(Field.newBuilder().setValue(Value.newBuilder().setStringVal("two"))) + .addFields( + Field.newBuilder() + .setName(hash("feature")) + .setValue(Value.newBuilder().setStringVal("two"))) .build()); List featureRows = @@ -193,7 +201,10 @@ public void shouldRetryFailConnection() throws InterruptedException { .build(), FeatureRow.newBuilder() .setEventTimestamp(Timestamp.getDefaultInstance()) - .addFields(Field.newBuilder().setValue(Value.newBuilder().setStringVal("one"))) + .addFields( + Field.newBuilder() + .setName(hash("feature")) + .setValue(Value.newBuilder().setStringVal("one"))) .build()); List featureRows = @@ -251,7 +262,10 @@ public void shouldProduceFailedElementIfRetryExceeded() { .build(), FeatureRow.newBuilder() .setEventTimestamp(Timestamp.getDefaultInstance()) - .addFields(Field.newBuilder().setValue(Value.newBuilder().setStringVal("one"))) + .addFields( + Field.newBuilder() + .setName(hash("feature")) + .setValue(Value.newBuilder().setStringVal("one"))) .build()); List featureRows = @@ -318,8 +332,14 @@ public void shouldConvertRowWithDuplicateEntitiesToValidKey() { FeatureRow expectedValue = FeatureRow.newBuilder() .setEventTimestamp(Timestamp.newBuilder().setSeconds(10)) - .addFields(Field.newBuilder().setValue(Value.newBuilder().setStringVal("strValue1"))) - .addFields(Field.newBuilder().setValue(Value.newBuilder().setInt64Val(1001))) + .addFields( + Field.newBuilder() + .setName(hash("feature_1")) + .setValue(Value.newBuilder().setStringVal("strValue1"))) + .addFields( + Field.newBuilder() + .setName(hash("feature_2")) + .setValue(Value.newBuilder().setInt64Val(1001))) .build(); p.apply(Create.of(offendingRow)).apply(redisFeatureSink.writer()); @@ -369,8 +389,14 @@ public void shouldConvertRowWithOutOfOrderFieldsToValidKey() { List expectedFields = Arrays.asList( - Field.newBuilder().setValue(Value.newBuilder().setStringVal("strValue1")).build(), - Field.newBuilder().setValue(Value.newBuilder().setInt64Val(1001)).build()); + Field.newBuilder() + .setName(hash("feature_1")) + .setValue(Value.newBuilder().setStringVal("strValue1")) + .build(), + Field.newBuilder() + .setName(hash("feature_2")) + .setValue(Value.newBuilder().setInt64Val(1001)) + .build()); FeatureRow expectedValue = FeatureRow.newBuilder() .setEventTimestamp(Timestamp.newBuilder().setSeconds(10)) @@ -429,8 +455,14 @@ public void shouldMergeDuplicateFeatureFields() { FeatureRow expectedValue = FeatureRow.newBuilder() .setEventTimestamp(Timestamp.newBuilder().setSeconds(10)) - .addFields(Field.newBuilder().setValue(Value.newBuilder().setStringVal("strValue1"))) - .addFields(Field.newBuilder().setValue(Value.newBuilder().setInt64Val(1001))) + .addFields( + Field.newBuilder() + .setName(hash("feature_1")) + .setValue(Value.newBuilder().setStringVal("strValue1"))) + .addFields( + Field.newBuilder() + .setName(hash("feature_2")) + .setValue(Value.newBuilder().setInt64Val(1001))) .build(); p.apply(Create.of(featureRowWithDuplicatedFeatureFields)).apply(redisFeatureSink.writer()); @@ -477,8 +509,12 @@ public void shouldPopulateMissingFeatureValuesWithDefaultInstance() { FeatureRow expectedValue = FeatureRow.newBuilder() .setEventTimestamp(Timestamp.newBuilder().setSeconds(10)) - .addFields(Field.newBuilder().setValue(Value.newBuilder().setStringVal("strValue1"))) - .addFields(Field.newBuilder().setValue(Value.getDefaultInstance())) + .addFields( + Field.newBuilder() + .setName(hash("feature_1")) + .setValue(Value.newBuilder().setStringVal("strValue1"))) + .addFields( + Field.newBuilder().setName(hash("feature_2")).setValue(Value.getDefaultInstance())) .build(); p.apply(Create.of(featureRowWithDuplicatedFeatureFields)).apply(redisFeatureSink.writer()); @@ -488,4 +524,8 @@ public void shouldPopulateMissingFeatureValuesWithDefaultInstance() { byte[] actual = sync.get(expectedKey.toByteArray()); assertThat(actual, equalTo(expectedValue.toByteArray())); } + + private static String hash(String input) { + return Hashing.murmur3_32().hashString(input, StandardCharsets.UTF_8).toString(); + } } From 3e1a4d0f0577f4cb35ed822f9dd8074288e79eaa Mon Sep 17 00:00:00 2001 From: Zhu Zhanyan Date: Thu, 30 Jul 2020 19:33:33 +0800 Subject: [PATCH 13/16] Fix ImportJobTest java unit test --- ingestion/src/test/java/feast/ingestion/ImportJobTest.java | 4 +++- ingestion/src/test/java/feast/test/TestUtil.java | 5 +++++ .../main/java/feast/storage/common/testing/TestUtil.java | 6 ++++++ .../redis/writer/RedisClusterFeatureSinkTest.java | 7 +------ .../connectors/redis/writer/RedisFeatureSinkTest.java | 7 +------ 5 files changed, 16 insertions(+), 13 deletions(-) diff --git a/ingestion/src/test/java/feast/ingestion/ImportJobTest.java b/ingestion/src/test/java/feast/ingestion/ImportJobTest.java index 1cfd29b541..f775bc31bb 100644 --- a/ingestion/src/test/java/feast/ingestion/ImportJobTest.java +++ b/ingestion/src/test/java/feast/ingestion/ImportJobTest.java @@ -217,7 +217,9 @@ public void runPipeline_ShouldWriteToRedisCorrectlyGivenValidSpecAndFeatureRow() .map(FeatureSpec::getName) .collect(Collectors.toList()) .contains(field.getName())) - .map(field -> field.toBuilder().clearName().build()) + .map( + field -> + field.toBuilder().setName(TestUtil.hash(field.getName())).build()) .collect(Collectors.toList()); randomRow = randomRow diff --git a/ingestion/src/test/java/feast/test/TestUtil.java b/ingestion/src/test/java/feast/test/TestUtil.java index b003137846..1fb8ea89ea 100644 --- a/ingestion/src/test/java/feast/test/TestUtil.java +++ b/ingestion/src/test/java/feast/test/TestUtil.java @@ -19,6 +19,7 @@ import static feast.common.models.FeatureSet.getFeatureSetStringRef; import com.google.common.collect.ImmutableList; +import com.google.common.hash.Hashing; import com.google.common.io.Files; import com.google.protobuf.ByteString; import com.google.protobuf.Message; @@ -517,4 +518,8 @@ public static void waitUntilAllElementsAreWrittenToStore( } } } + + public static String hash(String input) { + return Hashing.murmur3_32().hashString(input, StandardCharsets.UTF_8).toString(); + } } diff --git a/storage/api/src/main/java/feast/storage/common/testing/TestUtil.java b/storage/api/src/main/java/feast/storage/common/testing/TestUtil.java index 5f191d276c..773abd57d6 100644 --- a/storage/api/src/main/java/feast/storage/common/testing/TestUtil.java +++ b/storage/api/src/main/java/feast/storage/common/testing/TestUtil.java @@ -16,6 +16,7 @@ */ package feast.storage.common.testing; +import com.google.common.hash.Hashing; import com.google.protobuf.ByteString; import com.google.protobuf.Timestamp; import feast.proto.core.FeatureSetProto.FeatureSet; @@ -24,6 +25,7 @@ import feast.proto.types.FeatureRowProto.FeatureRow.Builder; import feast.proto.types.FieldProto.Field; import feast.proto.types.ValueProto.*; +import java.nio.charset.StandardCharsets; import java.time.Instant; import java.util.concurrent.ThreadLocalRandom; import org.apache.commons.lang3.RandomStringUtils; @@ -191,4 +193,8 @@ public static Field field(String name, Object value, ValueType.Enum valueType) { throw new IllegalStateException("Unexpected valueType: " + value.getClass()); } } + + public static String hash(String input) { + return Hashing.murmur3_32().hashString(input, StandardCharsets.UTF_8).toString(); + } } diff --git a/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/writer/RedisClusterFeatureSinkTest.java b/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/writer/RedisClusterFeatureSinkTest.java index 930d5ae7c4..62ddfff3a7 100644 --- a/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/writer/RedisClusterFeatureSinkTest.java +++ b/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/writer/RedisClusterFeatureSinkTest.java @@ -17,12 +17,12 @@ package feast.storage.connectors.redis.writer; import static feast.storage.common.testing.TestUtil.field; +import static feast.storage.common.testing.TestUtil.hash; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.hash.Hashing; import com.google.protobuf.Timestamp; import feast.common.models.FeatureSetReference; import feast.proto.core.FeatureSetProto.EntitySpec; @@ -41,7 +41,6 @@ import io.lettuce.core.codec.ByteArrayCodec; import java.io.File; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.nio.file.Paths; import java.util.*; import java.util.concurrent.ScheduledFuture; @@ -537,8 +536,4 @@ public void shouldPopulateMissingFeatureValuesWithDefaultInstance() { byte[] actual = redisClusterCommands.get(expectedKey.toByteArray()); assertThat(actual, equalTo(expectedValue.toByteArray())); } - - private static String hash(String input) { - return Hashing.murmur3_32().hashString(input, StandardCharsets.UTF_8).toString(); - } } diff --git a/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/writer/RedisFeatureSinkTest.java b/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/writer/RedisFeatureSinkTest.java index 08c1833854..948b8d0fda 100644 --- a/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/writer/RedisFeatureSinkTest.java +++ b/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/writer/RedisFeatureSinkTest.java @@ -17,12 +17,12 @@ package feast.storage.connectors.redis.writer; import static feast.storage.common.testing.TestUtil.field; +import static feast.storage.common.testing.TestUtil.hash; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.hash.Hashing; import com.google.protobuf.Timestamp; import feast.common.models.FeatureSetReference; import feast.proto.core.FeatureSetProto.EntitySpec; @@ -41,7 +41,6 @@ import io.lettuce.core.api.sync.RedisStringCommands; import io.lettuce.core.codec.ByteArrayCodec; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -524,8 +523,4 @@ public void shouldPopulateMissingFeatureValuesWithDefaultInstance() { byte[] actual = sync.get(expectedKey.toByteArray()); assertThat(actual, equalTo(expectedValue.toByteArray())); } - - private static String hash(String input) { - return Hashing.murmur3_32().hashString(input, StandardCharsets.UTF_8).toString(); - } } From 506f669b4e7ba1b46e446538869eddb8d9e79714 Mon Sep 17 00:00:00 2001 From: Zhu Zhanyan Date: Sat, 1 Aug 2020 10:20:20 +0800 Subject: [PATCH 14/16] Sync github workflows with master --- .github/workflows/complete.yml | 38 +++++++++++++++---------------- .github/workflows/master_only.yml | 38 ++++++++++++++----------------- 2 files changed, 36 insertions(+), 40 deletions(-) diff --git a/.github/workflows/complete.yml b/.github/workflows/complete.yml index 28363e469c..bb809f2f01 100644 --- a/.github/workflows/complete.yml +++ b/.github/workflows/complete.yml @@ -1,10 +1,10 @@ -name: complete +name: complete workflow on: [push, pull_request] jobs: build-push-docker-images: - runs-on: [self-hosted] + runs-on: ubuntu-latest strategy: matrix: component: [core, serving] @@ -15,12 +15,12 @@ jobs: - uses: actions/checkout@v2 - uses: GoogleCloudPlatform/github-actions/setup-gcloud@master with: - version: '290.0.1' - export_default_credentials: true - - run: gcloud auth configure-docker --quiet - - name: Build image + version: '270.0.0' + service_account_key: ${{ secrets.GCR_SERVICE_ACCOUNT }} + - run: gcloud auth configure-docker + - name: build image run: make build-${{ matrix.component }}-docker REGISTRY=${REGISTRY} VERSION=${GITHUB_SHA} - - name: Push image + - name: push image run: | docker push ${REGISTRY}/feast-${{ matrix.component }}:${GITHUB_SHA} if [ -n "${GITHUB_PR_SHA}" ]; then @@ -33,7 +33,7 @@ jobs: runs-on: [ubuntu-latest] steps: - uses: actions/checkout@v2 - - name: Lint java + - name: lint java run: make lint-java lint-python: @@ -41,11 +41,11 @@ jobs: runs-on: [ubuntu-latest] steps: - uses: actions/checkout@v2 - - name: Install dependencies + - name: install dependencies run: make install-python-ci-dependencies - - name: Compile protos + - name: compile protos run: make compile-protos-python - - name: Lint python + - name: lint python run: make lint-python lint-go: @@ -53,9 +53,9 @@ jobs: runs-on: [ubuntu-latest] steps: - uses: actions/checkout@v2 - - name: Install dependencies + - name: install dependencies run: make install-go-ci-dependencies - - name: Lint go + - name: lint go run: make lint-go lint-versions: @@ -77,7 +77,7 @@ jobs: key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }} restore-keys: | ${{ runner.os }}-maven- - - name: Test java + - name: test java run: make test-java-with-coverage - uses: actions/upload-artifact@v2 with: @@ -89,9 +89,9 @@ jobs: container: gcr.io/kf-feast/feast-ci:latest steps: - uses: actions/checkout@v2 - - name: Install python + - name: install python run: make install-python - - name: Test python + - name: test python run: make test-python unit-test-go: @@ -99,9 +99,9 @@ jobs: container: gcr.io/kf-feast/feast-ci:latest steps: - uses: actions/checkout@v2 - - name: Install dependencies + - name: install dependencies run: make compile-protos-go - - name: Test go + - name: test go run: make test-go integration-test: @@ -130,4 +130,4 @@ jobs: - uses: actions/upload-artifact@v2 with: name: load-test-results - path: load-test-output/ + path: load-test-output/ \ No newline at end of file diff --git a/.github/workflows/master_only.yml b/.github/workflows/master_only.yml index 7754838867..b2620c4e8d 100644 --- a/.github/workflows/master_only.yml +++ b/.github/workflows/master_only.yml @@ -8,47 +8,43 @@ on: jobs: build-docker-images: - runs-on: [self-hosted] + runs-on: [self-hosted, builder] strategy: matrix: component: [core, serving, jupyter, ci] steps: - uses: actions/checkout@v2 - - uses: GoogleCloudPlatform/github-actions/setup-gcloud@master - with: - version: '290.0.1' - export_default_credentials: true - - run: gcloud auth configure-docker --quiet - - name: Build image + - name: build image run: make build-${{ matrix.component }}-docker REGISTRY=gcr.io/kf-feast VERSION=${GITHUB_SHA} - - name: Push image + - name: push image run: make push-${{ matrix.component }}-docker REGISTRY=gcr.io/kf-feast VERSION=${GITHUB_SHA} - - name: Push image to feast dev + - name: push feast dev run: | if [ ${GITHUB_REF#refs/*/} == "master" ]; then docker tag gcr.io/kf-feast/feast-${{ matrix.component }}:${GITHUB_SHA} gcr.io/kf-feast/feast-${{ matrix.component }}:dev docker push gcr.io/kf-feast/feast-${{ matrix.component }}:dev fi - - name: Get version - run: echo ::set-env name=RELEASE_VERSION::${GITHUB_REF#refs/*/} - - name: Push versioned release + - name: get version + id: get_version + run: echo ::set-output name=VERSION::${${GITHUB_REF/refs\/tags\//}:1} + - name: push versioned release run: | + # Build and push semver tagged commits - rx='^v[0-9]+?\.[0-9]+?\.[0-9]+?$' - if [[ "${RELEASE_VERSION}" =~ $rx ]]; then - VERSION_WITHOUT_PREFIX=${RELEASE_VERSION:1} + rx='^([0-9]+\.){0,2}(\*|[0-9]+)$' + if [[ ${{ steps.get_version.outputs.VERSION }} =~ $rx ]]; then - docker tag gcr.io/kf-feast/feast-${{ matrix.component }}:${GITHUB_SHA} gcr.io/kf-feast/feast-${{ matrix.component }}:${VERSION_WITHOUT_PREFIX} - docker push gcr.io/kf-feast/feast-${{ matrix.component }}:${VERSION_WITHOUT_PREFIX} + docker tag gcr.io/kf-feast/feast-${{ matrix.component }}:${GITHUB_SHA} gcr.io/kf-feast/feast-${{ matrix.component }}:${{ steps.get_version.outputs.VERSION }} + docker push gcr.io/kf-feast/feast-${{ matrix.component }}:${{ steps.get_version.outputs.VERSION }} # Also update "latest" image if tagged commit is pushed to stable branch HIGHEST_SEMVER_TAG=$(git tag -l --sort -version:refname | head -n 1) echo "Only push to latest tag if tag is the highest semver version $HIGHEST_SEMVER_TAG" - if [ "${VERSION_WITHOUT_PREFIX}" == "${HIGHEST_SEMVER_TAG:1}" ] + if [ ${{ steps.get_version.outputs.VERSION }} == "${HIGHEST_SEMVER_TAG:1}" ] then - docker tag gcr.io/kf-feast/feast-${{ matrix.component }}:${GITHUB_SHA} gcr.io/kf-feast/feast-${{ matrix.component }}:latest - docker push gcr.io/kf-feast/feast-${{ matrix.component }}:latest + docker tag gcr.io/kf-feast/feast-${{ matrix.component }}:${GITHUB_SHA} gcr.io/kf-feast/feast-${{ matrix.component }}:${{ steps.get_version.outputs.VERSION }} + docker push gcr.io/kf-feast/feast-${{ matrix.component }}:${{ steps.get_version.outputs.VERSION }} fi fi @@ -56,5 +52,5 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - - name: Test docker compose + - name: test docker compose run: ./infra/scripts/test-docker-compose.sh From c122768ce45529fde7f1692ab65179f6c5ec74bb Mon Sep 17 00:00:00 2001 From: Zhu Zhanyan Date: Sat, 1 Aug 2020 10:35:16 +0800 Subject: [PATCH 15/16] Sync .github folder with master for fix --- .github/workflows/complete.yml | 38 +++++++++++++++---------------- .github/workflows/master_only.yml | 38 +++++++++++++++++-------------- 2 files changed, 40 insertions(+), 36 deletions(-) diff --git a/.github/workflows/complete.yml b/.github/workflows/complete.yml index bb809f2f01..28363e469c 100644 --- a/.github/workflows/complete.yml +++ b/.github/workflows/complete.yml @@ -1,10 +1,10 @@ -name: complete workflow +name: complete on: [push, pull_request] jobs: build-push-docker-images: - runs-on: ubuntu-latest + runs-on: [self-hosted] strategy: matrix: component: [core, serving] @@ -15,12 +15,12 @@ jobs: - uses: actions/checkout@v2 - uses: GoogleCloudPlatform/github-actions/setup-gcloud@master with: - version: '270.0.0' - service_account_key: ${{ secrets.GCR_SERVICE_ACCOUNT }} - - run: gcloud auth configure-docker - - name: build image + version: '290.0.1' + export_default_credentials: true + - run: gcloud auth configure-docker --quiet + - name: Build image run: make build-${{ matrix.component }}-docker REGISTRY=${REGISTRY} VERSION=${GITHUB_SHA} - - name: push image + - name: Push image run: | docker push ${REGISTRY}/feast-${{ matrix.component }}:${GITHUB_SHA} if [ -n "${GITHUB_PR_SHA}" ]; then @@ -33,7 +33,7 @@ jobs: runs-on: [ubuntu-latest] steps: - uses: actions/checkout@v2 - - name: lint java + - name: Lint java run: make lint-java lint-python: @@ -41,11 +41,11 @@ jobs: runs-on: [ubuntu-latest] steps: - uses: actions/checkout@v2 - - name: install dependencies + - name: Install dependencies run: make install-python-ci-dependencies - - name: compile protos + - name: Compile protos run: make compile-protos-python - - name: lint python + - name: Lint python run: make lint-python lint-go: @@ -53,9 +53,9 @@ jobs: runs-on: [ubuntu-latest] steps: - uses: actions/checkout@v2 - - name: install dependencies + - name: Install dependencies run: make install-go-ci-dependencies - - name: lint go + - name: Lint go run: make lint-go lint-versions: @@ -77,7 +77,7 @@ jobs: key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }} restore-keys: | ${{ runner.os }}-maven- - - name: test java + - name: Test java run: make test-java-with-coverage - uses: actions/upload-artifact@v2 with: @@ -89,9 +89,9 @@ jobs: container: gcr.io/kf-feast/feast-ci:latest steps: - uses: actions/checkout@v2 - - name: install python + - name: Install python run: make install-python - - name: test python + - name: Test python run: make test-python unit-test-go: @@ -99,9 +99,9 @@ jobs: container: gcr.io/kf-feast/feast-ci:latest steps: - uses: actions/checkout@v2 - - name: install dependencies + - name: Install dependencies run: make compile-protos-go - - name: test go + - name: Test go run: make test-go integration-test: @@ -130,4 +130,4 @@ jobs: - uses: actions/upload-artifact@v2 with: name: load-test-results - path: load-test-output/ \ No newline at end of file + path: load-test-output/ diff --git a/.github/workflows/master_only.yml b/.github/workflows/master_only.yml index b2620c4e8d..7754838867 100644 --- a/.github/workflows/master_only.yml +++ b/.github/workflows/master_only.yml @@ -8,43 +8,47 @@ on: jobs: build-docker-images: - runs-on: [self-hosted, builder] + runs-on: [self-hosted] strategy: matrix: component: [core, serving, jupyter, ci] steps: - uses: actions/checkout@v2 - - name: build image + - uses: GoogleCloudPlatform/github-actions/setup-gcloud@master + with: + version: '290.0.1' + export_default_credentials: true + - run: gcloud auth configure-docker --quiet + - name: Build image run: make build-${{ matrix.component }}-docker REGISTRY=gcr.io/kf-feast VERSION=${GITHUB_SHA} - - name: push image + - name: Push image run: make push-${{ matrix.component }}-docker REGISTRY=gcr.io/kf-feast VERSION=${GITHUB_SHA} - - name: push feast dev + - name: Push image to feast dev run: | if [ ${GITHUB_REF#refs/*/} == "master" ]; then docker tag gcr.io/kf-feast/feast-${{ matrix.component }}:${GITHUB_SHA} gcr.io/kf-feast/feast-${{ matrix.component }}:dev docker push gcr.io/kf-feast/feast-${{ matrix.component }}:dev fi - - name: get version - id: get_version - run: echo ::set-output name=VERSION::${${GITHUB_REF/refs\/tags\//}:1} - - name: push versioned release + - name: Get version + run: echo ::set-env name=RELEASE_VERSION::${GITHUB_REF#refs/*/} + - name: Push versioned release run: | - # Build and push semver tagged commits - rx='^([0-9]+\.){0,2}(\*|[0-9]+)$' - if [[ ${{ steps.get_version.outputs.VERSION }} =~ $rx ]]; then + rx='^v[0-9]+?\.[0-9]+?\.[0-9]+?$' + if [[ "${RELEASE_VERSION}" =~ $rx ]]; then + VERSION_WITHOUT_PREFIX=${RELEASE_VERSION:1} - docker tag gcr.io/kf-feast/feast-${{ matrix.component }}:${GITHUB_SHA} gcr.io/kf-feast/feast-${{ matrix.component }}:${{ steps.get_version.outputs.VERSION }} - docker push gcr.io/kf-feast/feast-${{ matrix.component }}:${{ steps.get_version.outputs.VERSION }} + docker tag gcr.io/kf-feast/feast-${{ matrix.component }}:${GITHUB_SHA} gcr.io/kf-feast/feast-${{ matrix.component }}:${VERSION_WITHOUT_PREFIX} + docker push gcr.io/kf-feast/feast-${{ matrix.component }}:${VERSION_WITHOUT_PREFIX} # Also update "latest" image if tagged commit is pushed to stable branch HIGHEST_SEMVER_TAG=$(git tag -l --sort -version:refname | head -n 1) echo "Only push to latest tag if tag is the highest semver version $HIGHEST_SEMVER_TAG" - if [ ${{ steps.get_version.outputs.VERSION }} == "${HIGHEST_SEMVER_TAG:1}" ] + if [ "${VERSION_WITHOUT_PREFIX}" == "${HIGHEST_SEMVER_TAG:1}" ] then - docker tag gcr.io/kf-feast/feast-${{ matrix.component }}:${GITHUB_SHA} gcr.io/kf-feast/feast-${{ matrix.component }}:${{ steps.get_version.outputs.VERSION }} - docker push gcr.io/kf-feast/feast-${{ matrix.component }}:${{ steps.get_version.outputs.VERSION }} + docker tag gcr.io/kf-feast/feast-${{ matrix.component }}:${GITHUB_SHA} gcr.io/kf-feast/feast-${{ matrix.component }}:latest + docker push gcr.io/kf-feast/feast-${{ matrix.component }}:latest fi fi @@ -52,5 +56,5 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - - name: test docker compose + - name: Test docker compose run: ./infra/scripts/test-docker-compose.sh From e11222e7a44bb4ff1433c3357f0cbf285c972dee Mon Sep 17 00:00:00 2001 From: Zhu Zhanyan Date: Sat, 1 Aug 2020 13:13:41 +0800 Subject: [PATCH 16/16] Replace v1/v2 encoding with v1/v2 decoder in docs --- .../redis/retriever/FeatureRowDecoder.java | 24 ++++++++++--------- .../redis/writer/RedisCustomIO.java | 2 +- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/FeatureRowDecoder.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/FeatureRowDecoder.java index d487636aa9..d89e537366 100644 --- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/FeatureRowDecoder.java +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/FeatureRowDecoder.java @@ -41,10 +41,10 @@ public FeatureRowDecoder(String featureSetRef, FeatureSetSpec spec) { } /** - * Check if encoded feature row v1 is encoded. The Feature Row v1 encoding defines that a Feature - * Row is considered encoded if both it's feature set reference and fields names are not set . The - * no. of fields in the feature row should also match up with the number of fields in the Feature - * Set spec. NOTE: This method is deprecated and will be removed in Feast v0.7. + * Check if encoded feature row can be decoded by v1 Decoder. The v1 Decoder requires that the + * Feature Row to have both it's feature set reference and fields names are not set. The no. of + * fields in the feature row should also match up with the number of fields in the Feature Set + * spec. NOTE: This method is deprecated and will be removed in Feast v0.7. * * @param featureRow Feature row * @return boolean @@ -57,8 +57,8 @@ private boolean isEncodedV1(FeatureRow featureRow) { } /** - * Check if encoded feature row v2 is encoded. The Feature Row v2 encoding defines that a Feature - * Row is considered encoded if it's both it feature set reference and fields names are set. + * Check if encoded feature row can be decoded by Decoder. The v2 Decoder requires that a Feature + * Row to have both it feature set reference and fields names are set. * * @param featureRow Feature row * @return boolean @@ -68,8 +68,8 @@ private boolean isEncodedV2(FeatureRow featureRow) { } /** - * Decode feature row encoded by {@link RedisCustomIO}. NOTE: support for decoding Feature Row v1 - * encoding will be dropped in Feast 0.7 + * Decode feature row encoded by {@link RedisCustomIO}. NOTE: The v1 Decoder will be removed in + * Feast 0.7 * * @throws IllegalArgumentException if unable to the decode the given feature row * @param encodedFeatureRow Feature row @@ -77,7 +77,8 @@ private boolean isEncodedV2(FeatureRow featureRow) { */ public FeatureRow decode(FeatureRow encodedFeatureRow) { if (isEncodedV1(encodedFeatureRow)) { - // TODO: remove support for v1 feature row in Feast 0.7 + // TODO: remove v1 feature row decoder in Feast 0.7 + // Decode Feature Rows using the v1 Decoder. final List fieldsWithoutName = encodedFeatureRow.getFieldsList(); List featureNames = spec.getFeaturesList().stream() @@ -106,8 +107,9 @@ public FeatureRow decode(FeatureRow encodedFeatureRow) { .build(); } if (isEncodedV2(encodedFeatureRow)) { - // Feature Row v2 encoding uses a hashed name as the field name and does not have feature set - // reference set. + // Decode Feature Rows using the v2 Decoder. + // v2 Decoder input Feature Rows should use a hashed name as the field name and + // should not have feature set reference set. // Decoding reverts the field name to a unhashed string and set feature set reference. Map nameHashValueMap = encodedFeatureRow.getFieldsList().stream() diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisCustomIO.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisCustomIO.java index 53dcba27e9..f73c458d78 100644 --- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisCustomIO.java +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisCustomIO.java @@ -207,7 +207,7 @@ private byte[] getKey(FeatureRow featureRow, FeatureSetSpec spec) { } /** - * Encode the Feature Row as bytes to store in Redis in encoded Feature Row v2 encoding. To + * Encode the Feature Row as bytes to store in Redis in encoded Feature Row encoding. To * reduce storage space consumption in redis, feature rows are "encoded" by hashing the fields * names and not unsetting the feature set reference. {@link FeatureRowDecoder} is * rensponsible for reversing this "encoding" step.