Skip to content

Commit

Permalink
Add feature and feature set labels
Browse files Browse the repository at this point in the history
Backports feast-dev#536 to v0.3
  • Loading branch information
Suwinski, Krzysztof (Agoda) authored and ches committed May 23, 2020
1 parent 0eb9c56 commit 2a9d672
Show file tree
Hide file tree
Showing 34 changed files with 2,276 additions and 992 deletions.
30 changes: 27 additions & 3 deletions core/src/main/java/feast/core/model/FeatureSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import feast.core.FeatureSetProto.EntitySpec;
import feast.core.FeatureSetProto.FeatureSetSpec;
import feast.core.FeatureSetProto.FeatureSpec;
import feast.core.util.TypeConversion;
import feast.types.ValueProto.ValueType;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -79,6 +80,10 @@ public class FeatureSet extends AbstractTimestampEntity implements Comparable<Fe
@JoinColumn(name = "source")
private Source source;

// User defined metadata
@Column(name = "labels", columnDefinition = "text")
private String labels;

public FeatureSet() {
super();
}
Expand All @@ -89,22 +94,25 @@ public FeatureSet(
long maxAgeSeconds,
List<Field> entities,
List<Field> features,
Source source) {
Source source,
Map<String, String> labels) {
this.id = String.format("%s:%s", name, version);
this.name = name;
this.version = version;
this.maxAgeSeconds = maxAgeSeconds;
this.entities = entities;
this.features = features;
this.source = source;
this.labels = TypeConversion.convertMapToJsonString(labels);
}

public static FeatureSet fromProto(FeatureSetSpec featureSetSpec) {
Source source = Source.fromProto(featureSetSpec.getSource());
String id = String.format("%s:%d", featureSetSpec.getName(), featureSetSpec.getVersion());
List<Field> features = new ArrayList<>();
for (FeatureSpec feature : featureSetSpec.getFeaturesList()) {
features.add(new Field(id, feature.getName(), feature.getValueType()));
features.add(
new Field(id, feature.getName(), feature.getValueType(), feature.getLabelsMap()));
}
List<Field> entities = new ArrayList<>();
for (EntitySpec entity : featureSetSpec.getEntitiesList()) {
Expand All @@ -117,7 +125,8 @@ public static FeatureSet fromProto(FeatureSetSpec featureSetSpec) {
featureSetSpec.getMaxAge().getSeconds(),
entities,
features,
source);
source,
featureSetSpec.getLabelsMap());
}

public FeatureSetSpec toProto() throws InvalidProtocolBufferException {
Expand All @@ -136,6 +145,7 @@ public FeatureSetSpec toProto() throws InvalidProtocolBufferException {
FeatureSpec.newBuilder()
.setName(feature.getName())
.setValueType(ValueType.Enum.valueOf(feature.getType()))
.putAllLabels(feature.getLabels())
.build());
}
return FeatureSetSpec.newBuilder()
Expand All @@ -145,6 +155,7 @@ public FeatureSetSpec toProto() throws InvalidProtocolBufferException {
.addAllEntities(entitySpecs)
.addAllFeatures(featureSpecs)
.setSource(source.toProto())
.putAllLabels(TypeConversion.convertJsonStringToMap(labels))
.build();
}

Expand Down Expand Up @@ -209,4 +220,17 @@ public boolean equalTo(FeatureSet other) {
public int compareTo(FeatureSet o) {
return Integer.compare(version, o.version);
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof FeatureSet)) {
return false;
}
FeatureSet other = (FeatureSet) obj;

return this.equalTo(other) && labels.equals(other.labels);
}
}
23 changes: 20 additions & 3 deletions core/src/main/java/feast/core/model/Field.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
*/
package feast.core.model;

import feast.core.util.TypeConversion;
import feast.types.ValueProto.ValueType;
import java.util.Map;
import java.util.Objects;
import javax.persistence.Column;
import javax.persistence.Entity;
Expand Down Expand Up @@ -52,18 +54,31 @@ public class Field {
@Column(name = "type", nullable = false)
private String type;

// Labels that this field belongs to
@Column(name = "labels", columnDefinition = "text")
private String labels;

public Field() {
super();
}

public Field(String featureSetId, String name, ValueType.Enum type) {
public Field(String featureSetId, String name, ValueType.Enum type, Map<String, String> labels) {
// TODO: Remove all mention of feature sets inside of this class!
FeatureSet featureSet = new FeatureSet();
featureSet.setId(featureSetId);
this.featureSet = featureSet;
this.id = String.format("%s:%s", featureSetId, name);
this.name = name;
this.type = type.toString();
this.labels = TypeConversion.convertMapToJsonString(labels);
}

public Field(String featureSetId, String name, ValueType.Enum type) {
this(featureSetId, name, type, null);
}

public Map<String, String> getLabels() {
return TypeConversion.convertJsonStringToMap(this.labels);
}

@Override
Expand All @@ -75,11 +90,13 @@ public boolean equals(Object o) {
return false;
}
Field field = (Field) o;
return name.equals(field.getName()) && type.equals(field.getType());
return name.equals(field.getName())
&& type.equals(field.getType())
&& labels.equals(field.labels);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), id, featureSet, name, type);
return Objects.hash(super.hashCode(), id, featureSet, name, type, labels);
}
}
3 changes: 0 additions & 3 deletions core/src/main/java/feast/core/util/TypeConversion.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,6 @@ public static Map<String, String> convertJsonStringToMap(String jsonString) {
* @return json string corresponding to given map
*/
public static String convertMapToJsonString(Map<String, String> map) {
if (map.isEmpty()) {
return "{}";
}
return gson.toJson(map);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,19 @@

public class FeatureSetValidator {
public static void validateSpec(FeatureSetSpec featureSetSpec) {
if (featureSetSpec.getLabelsMap().containsKey("")) {
throw new IllegalArgumentException("Feature set label keys must not be empty");
}
checkValidCharacters(featureSetSpec.getName(), "name");
checkUniqueColumns(featureSetSpec.getEntitiesList(), featureSetSpec.getFeaturesList());
for (EntitySpec entitySpec : featureSetSpec.getEntitiesList()) {
checkValidCharacters(entitySpec.getName(), "entities::name");
}
for (FeatureSpec featureSpec : featureSetSpec.getFeaturesList()) {
checkValidCharacters(featureSpec.getName(), "features::name");
if (featureSpec.getLabelsMap().containsKey("")) {
throw new IllegalArgumentException("Feature label keys must not be empty");
}
}
}

Expand Down
141 changes: 113 additions & 28 deletions core/src/test/java/feast/core/service/SpecServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,9 @@
import feast.core.CoreServiceProto.UpdateStoreRequest;
import feast.core.CoreServiceProto.UpdateStoreResponse;
import feast.core.FeatureSetProto;
import feast.core.FeatureSetProto.EntitySpec;
import feast.core.FeatureSetProto.FeatureSetSpec;
import feast.core.FeatureSetProto.FeatureSpec;
import feast.core.SourceProto.KafkaSourceConfig;
import feast.core.SourceProto.SourceType;
import feast.core.StoreProto;
import feast.core.StoreProto.Store.RedisConfig;
import feast.core.StoreProto.Store.StoreType;
Expand All @@ -53,10 +52,7 @@
import feast.core.model.Store;
import feast.types.ValueProto.ValueType.Enum;
import io.grpc.StatusRuntimeException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.*;
import java.util.stream.Collectors;
import org.junit.Before;
import org.junit.Rule;
Expand All @@ -82,26 +78,18 @@ public class SpecServiceTest {
@Before
public void setUp() {
initMocks(this);
defaultSource =
new Source(
SourceType.KAFKA,
KafkaSourceConfig.newBuilder()
.setBootstrapServers("kafka:9092")
.setTopic("my-topic")
.build(),
true);
defaultSource = TestObjectFactory.defaultSource;

FeatureSet featureSet1v1 = newDummyFeatureSet("f1", 1);
FeatureSet featureSet1v2 = newDummyFeatureSet("f1", 2);
FeatureSet featureSet1v3 = newDummyFeatureSet("f1", 3);
FeatureSet featureSet2v1 = newDummyFeatureSet("f2", 1);

Field f3f1 = new Field("f3", "f3f1", Enum.INT64);
Field f3f2 = new Field("f3", "f3f2", Enum.INT64);
Field f3e1 = new Field("f3", "f3e1", Enum.STRING);
Field f3f1 = TestObjectFactory.CreateFeatureField("f3", "f3f1", Enum.INT64);
Field f3f2 = TestObjectFactory.CreateFeatureField("f3", "f3f2", Enum.INT64);
Field f3e1 = TestObjectFactory.CreateEntityField("f3", "f3e1", Enum.STRING);
FeatureSet featureSet3v1 =
new FeatureSet(
"f3", 1, 100L, Arrays.asList(f3e1), Arrays.asList(f3f2, f3f1), defaultSource);
TestObjectFactory.CreateFeatureSet("f3", 1, Arrays.asList(f3e1), Arrays.asList(f3f2, f3f1));

featureSets =
Arrays.asList(featureSet1v1, featureSet1v2, featureSet1v3, featureSet2v1, featureSet3v1);
Expand Down Expand Up @@ -349,12 +337,12 @@ public void applyFeatureSetShouldIncrementFeatureSetVersionIfAlreadyExists()
public void applyFeatureSetShouldNotCreateFeatureSetIfFieldsUnordered()
throws InvalidProtocolBufferException {

Field f3f1 = new Field("f3", "f3f1", Enum.INT64);
Field f3f2 = new Field("f3", "f3f2", Enum.INT64);
Field f3e1 = new Field("f3", "f3e1", Enum.STRING);
Field f3f1 = TestObjectFactory.CreateFeatureField("f3", "f3f1", Enum.INT64);
Field f3f2 = TestObjectFactory.CreateFeatureField("f3", "f3f2", Enum.INT64);
Field f3e1 = TestObjectFactory.CreateEntityField("f3", "f3e1", Enum.STRING);
FeatureSetProto.FeatureSetSpec incomingFeatureSet =
(new FeatureSet(
"f3", 5, 100L, Arrays.asList(f3e1), Arrays.asList(f3f2, f3f1), defaultSource))
(TestObjectFactory.CreateFeatureSet(
"f3", 5, Arrays.asList(f3e1), Arrays.asList(f3f2, f3f1)))
.toProto();

FeatureSetSpec expected = incomingFeatureSet;
Expand All @@ -367,6 +355,100 @@ public void applyFeatureSetShouldNotCreateFeatureSetIfFieldsUnordered()
assertThat(applyFeatureSetResponse.getFeatureSet().getName(), equalTo(expected.getName()));
}

@Test
public void applyFeatureSetShouldAcceptFeatureLabels() throws InvalidProtocolBufferException {
List<FeatureSetProto.EntitySpec> entitySpecs = new ArrayList<>();
entitySpecs.add(
FeatureSetProto.EntitySpec.newBuilder()
.setName("entity1")
.setValueType(Enum.INT64)
.build());

Map<String, String> featureLabels0 =
new HashMap<String, String>() {
{
put("label1", "feast1");
}
};

Map<String, String> featureLabels1 =
new HashMap<String, String>() {
{
put("label1", "feast1");
put("label2", "feast2");
}
};

List<Map<String, String>> featureLabels = new ArrayList<>();
featureLabels.add(featureLabels0);
featureLabels.add(featureLabels1);

List<FeatureSpec> featureSpecs = new ArrayList<>();
featureSpecs.add(
FeatureSpec.newBuilder()
.setName("feature1")
.setValueType(Enum.INT64)
.putAllLabels(featureLabels.get(0))
.build());
featureSpecs.add(
FeatureSpec.newBuilder()
.setName("feature2")
.setValueType(Enum.INT64)
.putAllLabels(featureLabels.get(1))
.build());

FeatureSetSpec featureSetSpec =
FeatureSetSpec.newBuilder()
.setName("featureSetWithConstraints")
.addAllEntities(entitySpecs)
.addAllFeatures(featureSpecs)
.build();

ApplyFeatureSetResponse applyFeatureSetResponse = specService.applyFeatureSet(featureSetSpec);
FeatureSetSpec appliedFeatureSetSpec = applyFeatureSetResponse.getFeatureSet();

// appliedEntitySpecs needs to be sorted because the list returned by specService may not
// follow the order in the request
List<FeatureSetProto.EntitySpec> appliedEntitySpecs =
new ArrayList<>(appliedFeatureSetSpec.getEntitiesList());
appliedEntitySpecs.sort(Comparator.comparing(EntitySpec::getName));

// appliedFeatureSpecs needs to be sorted because the list returned by specService may not
// follow the order in the request
List<FeatureSpec> appliedFeatureSpecs =
new ArrayList<>(appliedFeatureSetSpec.getFeaturesList());
appliedFeatureSpecs.sort(Comparator.comparing(FeatureSpec::getName));

List<Map<String, String>> featureSpecsLabels =
featureSpecs.stream().map(e -> e.getLabelsMap()).collect(Collectors.toList());
assertThat(appliedEntitySpecs, equalTo(entitySpecs));
assertThat(appliedFeatureSpecs, equalTo(featureSpecs));
assertThat(featureSpecsLabels, equalTo(featureLabels));
}

@Test
public void applyFeatureSetShouldAcceptFeatureSetLabels() throws InvalidProtocolBufferException {
Map<String, String> featureSetLabels =
new HashMap<String, String>() {
{
put("description", "My precious feature set");
}
};

FeatureSetSpec featureSetSpec =
FeatureSetSpec.newBuilder()
.setName("preciousFeatureSet")
.putAllLabels(featureSetLabels)
.build();

ApplyFeatureSetResponse applyFeatureSetResponse = specService.applyFeatureSet(featureSetSpec);
FeatureSetSpec appliedFeatureSetSpec = applyFeatureSetResponse.getFeatureSet();

Map<String, String> appliedLabels = appliedFeatureSetSpec.getLabelsMap();

assertThat(appliedLabels, equalTo(featureSetLabels));
}

@Test
public void shouldUpdateStoreIfConfigChanges() throws InvalidProtocolBufferException {
when(storeRepository.findById("SERVING")).thenReturn(Optional.of(stores.get(0)));
Expand Down Expand Up @@ -406,10 +488,13 @@ public void shouldDoNothingIfNoChange() throws InvalidProtocolBufferException {
}

private FeatureSet newDummyFeatureSet(String name, int version) {
Field feature = new Field(name, "feature", Enum.INT64);
Field entity = new Field(name, "entity", Enum.STRING);
return new FeatureSet(
name, version, 100L, Arrays.asList(entity), Arrays.asList(feature), defaultSource);
Field feature = TestObjectFactory.CreateFeatureField(name, "feature", Enum.INT64);
Field entity = TestObjectFactory.CreateEntityField(name, "entity", Enum.STRING);

FeatureSet fs =
TestObjectFactory.CreateFeatureSet(
name, version, Arrays.asList(entity), Arrays.asList(feature));
return fs;
}

private Store newDummyStore(String name) {
Expand Down
Loading

0 comments on commit 2a9d672

Please sign in to comment.