Skip to content

Commit

Permalink
Bug Fixes: Python SDK and Feast Core (#353)
Browse files Browse the repository at this point in the history
* Remove error handling in version and connect methods for Feast Python SDK client

* Relax protobuf version constraint

* Fix event_time column rename bug

* Allow get_feature_set to raise an exception if no feature set found

* Add ordering to feature set lists and fix get_feature_set error message

* Python SDK: Always update local feature set after apply, and always update source

* * Resolve bug with ApplyFeatureSet where it would raise an Unknown exception due to a null source type
* Resolve an issue where feature sets with a changing order of fields would increment feature set versions

* Python SDK: Only return version information when url is set

* Fixed bug in version() method with assignment

* Remove exception handling from equalTo method for Feature Set
  • Loading branch information
woop authored and feast-ci-bot committed Dec 10, 2019
1 parent e4f2756 commit c36c6f5
Show file tree
Hide file tree
Showing 12 changed files with 258 additions and 199 deletions.
9 changes: 7 additions & 2 deletions core/src/main/java/feast/core/dao/FeatureSetRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ public interface FeatureSetRepository extends JpaRepository<FeatureSet, String>
List<FeatureSet> findByName(String name);

// find all versions of featureSets with names matching the regex
@Query(nativeQuery = true, value = "SELECT * FROM feature_sets WHERE name LIKE ?1")
List<FeatureSet> findByNameWithWildcard(String name);
@Query(nativeQuery = true, value = "SELECT * FROM feature_sets "
+ "WHERE name LIKE ?1 ORDER BY name ASC, version ASC")
List<FeatureSet> findByNameWithWildcardOrderByNameAscVersionAsc(String name);

// find all feature sets and order by name and version
List<FeatureSet> findAllByOrderByNameAscVersionAsc();

}
7 changes: 5 additions & 2 deletions core/src/main/java/feast/core/grpc/CoreServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import feast.core.grpc.interceptors.MonitoringInterceptor;
import feast.core.service.JobCoordinatorService;
import feast.core.service.SpecService;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.HashSet;
import java.util.Set;
Expand All @@ -50,7 +51,9 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;

/** Implementation of the feast core GRPC service. */
/**
* Implementation of the feast core GRPC service.
*/
@Slf4j
@GRpcService(interceptors = {MonitoringInterceptor.class})
public class CoreServiceImpl extends CoreServiceImplBase {
Expand Down Expand Up @@ -78,7 +81,7 @@ public void getFeatureSet(
GetFeatureSetResponse response = specService.getFeatureSet(request);
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (RetrievalException | InvalidProtocolBufferException e) {
} catch (RetrievalException | InvalidProtocolBufferException | StatusRuntimeException e) {
log.error("Exception has occurred in GetFeatureSet method: ", e);
responseObserver.onError(e);
}
Expand Down
58 changes: 52 additions & 6 deletions core/src/main/java/feast/core/model/FeatureSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import feast.core.FeatureSetProto.FeatureSpec;
import feast.types.ValueProto.ValueType;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.persistence.CascadeType;
import javax.persistence.Column;
import javax.persistence.Entity;
Expand Down Expand Up @@ -152,12 +154,56 @@ public FeatureSetSpec toProto() throws InvalidProtocolBufferException {
* @param other FeatureSet to compare to
* @return boolean denoting if the source or schema have changed.
*/
public boolean equalTo(FeatureSet other) throws InvalidProtocolBufferException {
return name.equals(other.getName())
&& entities.equals(other.entities)
&& features.equals(other.features)
&& source.equalTo(other.getSource())
&& maxAgeSeconds == other.maxAgeSeconds;
public boolean equalTo(FeatureSet other) {
if(!name.equals(other.getName())){
return false;
}

if (!source.equalTo(other.getSource())){
return false;
}

if (maxAgeSeconds != other.maxAgeSeconds){
return false;
}

// Create a map of all fields in this feature set
Map<String, Field> fields = new HashMap<>();

for (Field e : entities){
fields.putIfAbsent(e.getName(), e);
}

for (Field f : features){
fields.putIfAbsent(f.getName(), f);
}

// Ensure map size is consistent with existing fields
if (fields.size() != other.features.size() + other.entities.size())
{
return false;
}

// Ensure the other entities and fields exist in the field map
for (Field e : other.entities){
if(!fields.containsKey(e.getName())){
return false;
}
if (!e.equals(fields.get(e.getName()))){
return false;
}
}

for (Field f : features){
if(!fields.containsKey(f.getName())){
return false;
}
if (!f.equals(fields.get(f.getName()))){
return false;
}
}

return true;
}

@Override
Expand Down
1 change: 1 addition & 0 deletions core/src/main/java/feast/core/model/Field.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public Field() {
}

public Field(String featureSetId, String name, ValueType.Enum type) {
// TODO: Remove all mention of feature sets inside of this class!
FeatureSet featureSet = new FeatureSet();
featureSet.setId(featureSetId);
this.featureSet = featureSet;
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/feast/core/model/Source.java
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,11 @@ public boolean isDefault() {
* @return boolean equal
*/
public boolean equalTo(Source other) {
if (other.isDefault && isDefault) {
if (other.isDefault && isDefault || (type == null && other.type == null)) {
return true;
}

if (!type.equals(other.type)) {
if ((type == null || !type.equals(other.type))) {
return false;
}

Expand Down
24 changes: 17 additions & 7 deletions core/src/main/java/feast/core/service/SpecService.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,18 +107,27 @@ public GetFeatureSetResponse getFeatureSet(GetFeatureSetRequest request)
if (request.getVersion() == 0) {
featureSet =
featureSetRepository.findFirstFeatureSetByNameOrderByVersionDesc(request.getName());

if (featureSet == null) {
throw io.grpc.Status.NOT_FOUND
.withDescription(String.format("Feature set with name \"%s\" could not be found.",
request.getName()))
.asRuntimeException();
}
} else {
featureSet =
featureSetRepository.findFeatureSetByNameAndVersion(
request.getName(), request.getVersion());
}

if (featureSet == null) {
throw io.grpc.Status.NOT_FOUND
.withDescription("Feature set could not be found")
.asRuntimeException();
if (featureSet == null) {
throw io.grpc.Status.NOT_FOUND
.withDescription(String.format("Feature set with name \"%s\" and version \"%s\" could "
+ "not be found.", request.getName(), request.getVersion()))
.asRuntimeException();
}
}


// Only a single item in list, return successfully
return GetFeatureSetResponse.newBuilder().setFeatureSet(featureSet.toProto()).build();
}
Expand All @@ -143,9 +152,9 @@ public ListFeatureSetsResponse listFeatureSets(ListFeatureSetsRequest.Filter fil
checkValidFeatureSetFilterName(name, "featureSetName");
List<FeatureSet> featureSets;
if (name.equals("")) {
featureSets = featureSetRepository.findAll();
featureSets = featureSetRepository.findAllByOrderByNameAscVersionAsc();
} else {
featureSets = featureSetRepository.findByNameWithWildcard(name.replace('*', '%'));
featureSets = featureSetRepository.findByNameWithWildcardOrderByNameAscVersionAsc(name.replace('*', '%'));
featureSets =
featureSets.stream()
.filter(getVersionFilter(filter.getFeatureSetVersion()))
Expand Down Expand Up @@ -208,6 +217,7 @@ public ApplyFeatureSetResponse applyFeatureSet(FeatureSetSpec newFeatureSetSpec)
FeatureSetValidator.validateSpec(newFeatureSetSpec);
List<FeatureSet> existingFeatureSets =
featureSetRepository.findByName(newFeatureSetSpec.getName());

if (existingFeatureSets.size() == 0) {
newFeatureSetSpec = newFeatureSetSpec.toBuilder().setVersion(1).build();
} else {
Expand Down
52 changes: 45 additions & 7 deletions core/src/test/java/feast/core/service/SpecServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import feast.core.CoreServiceProto.ListStoresResponse;
import feast.core.CoreServiceProto.UpdateStoreRequest;
import feast.core.CoreServiceProto.UpdateStoreResponse;
import feast.core.FeatureSetProto;
import feast.core.FeatureSetProto.FeatureSetSpec;
import feast.core.FeatureSetProto.FeatureSpec;
import feast.core.SourceProto.KafkaSourceConfig;
Expand Down Expand Up @@ -67,11 +68,14 @@

public class SpecServiceTest {

@Mock private FeatureSetRepository featureSetRepository;
@Mock
private FeatureSetRepository featureSetRepository;

@Mock private StoreRepository storeRepository;
@Mock
private StoreRepository storeRepository;

@Rule public final ExpectedException expectedException = ExpectedException.none();
@Rule
public final ExpectedException expectedException = ExpectedException.none();

private SpecService specService;
private List<FeatureSet> featureSets;
Expand All @@ -95,14 +99,25 @@ public void setUp() {
FeatureSet featureSet1v3 = newDummyFeatureSet("f1", 3);
FeatureSet featureSet2v1 = newDummyFeatureSet("f2", 1);

featureSets = Arrays.asList(featureSet1v1, featureSet1v2, featureSet1v3, featureSet2v1);
Field f3f1 = new Field("f3", "f3f1", Enum.INT64);
Field f3f2 = new Field("f3", "f3f2", Enum.INT64);
Field f3e1 = new Field("f3", "f3e1", Enum.STRING);
FeatureSet featureSet3v1 = new FeatureSet(
"f3", 1, 100L, Arrays.asList(f3e1), Arrays.asList(f3f2, f3f1), defaultSource);

featureSets = Arrays
.asList(featureSet1v1, featureSet1v2, featureSet1v3, featureSet2v1, featureSet3v1);
when(featureSetRepository.findAll()).thenReturn(featureSets);
when(featureSetRepository.findAllByOrderByNameAscVersionAsc()).thenReturn(featureSets);
when(featureSetRepository.findByName("f1")).thenReturn(featureSets.subList(0, 3));
when(featureSetRepository.findByName("f3")).thenReturn(featureSets.subList(4, 5));
when(featureSetRepository.findFirstFeatureSetByNameOrderByVersionDesc("f1"))
.thenReturn(featureSet1v3);
when(featureSetRepository.findByNameWithWildcard("f1")).thenReturn(featureSets.subList(0, 3));
when(featureSetRepository.findByNameWithWildcardOrderByNameAscVersionAsc("f1"))
.thenReturn(featureSets.subList(0, 3));
when(featureSetRepository.findByName("asd")).thenReturn(Lists.newArrayList());
when(featureSetRepository.findByNameWithWildcard("f%")).thenReturn(featureSets);
when(featureSetRepository.findByNameWithWildcardOrderByNameAscVersionAsc("f%"))
.thenReturn(featureSets);

Store store1 = newDummyStore("SERVING");
Store store2 = newDummyStore("WAREHOUSE");
Expand Down Expand Up @@ -238,7 +253,8 @@ public void shouldThrowExceptionGivenMissingFeatureSetName()
@Test
public void shouldThrowExceptionGivenMissingFeatureSet() throws InvalidProtocolBufferException {
expectedException.expect(StatusRuntimeException.class);
expectedException.expectMessage("NOT_FOUND: Feature set could not be found");
expectedException.expectMessage(
"NOT_FOUND: Feature set with name \"f1000\" and version \"2\" could not be found.");
specService.getFeatureSet(
GetFeatureSetRequest.newBuilder().setName("f1000").setVersion(2).build());
}
Expand Down Expand Up @@ -331,6 +347,28 @@ public void applyFeatureSetShouldIncrementFeatureSetVersionIfAlreadyExists()
assertThat(applyFeatureSetResponse.getFeatureSet(), equalTo(expected));
}


@Test
public void applyFeatureSetShouldNotCreateFeatureSetIfFieldsUnordered()
throws InvalidProtocolBufferException {

Field f3f1 = new Field("f3", "f3f1", Enum.INT64);
Field f3f2 = new Field("f3", "f3f2", Enum.INT64);
Field f3e1 = new Field("f3", "f3e1", Enum.STRING);
FeatureSetProto.FeatureSetSpec incomingFeatureSet = (new FeatureSet(
"f3", 5, 100L, Arrays.asList(f3e1), Arrays.asList(f3f2, f3f1), defaultSource)).toProto();

FeatureSetSpec expected = incomingFeatureSet;
ApplyFeatureSetResponse applyFeatureSetResponse =
specService.applyFeatureSet(incomingFeatureSet);
assertThat(applyFeatureSetResponse.getStatus(), equalTo(Status.NO_CHANGE));
assertThat(applyFeatureSetResponse.getFeatureSet().getMaxAge(), equalTo(expected.getMaxAge()));
assertThat(applyFeatureSetResponse.getFeatureSet().getEntities(0),
equalTo(expected.getEntities(0)));
assertThat(applyFeatureSetResponse.getFeatureSet().getName(), equalTo(expected.getName()));
}


@Test
public void shouldUpdateStoreIfConfigChanges() throws InvalidProtocolBufferException {
when(storeRepository.findById("SERVING")).thenReturn(Optional.of(stores.get(0)));
Expand Down
Loading

0 comments on commit c36c6f5

Please sign in to comment.