Skip to content

Commit

Permalink
Add Feature Tables API to Core & Python SDK (#1019)
Browse files Browse the repository at this point in the history
* Reorganise existing protos in CoreService by type.

Signed-off-by: Terence <[email protected]>

* Add new FeatureTables API to Core Protobuf definitions

Signed-off-by: Terence <[email protected]>

* Fix name collision in proto java outer classname with message name

Signed-off-by: Terence <[email protected]>

* Add missing max age field to Feature Table proto.

Signed-off-by: Terence <[email protected]>

* Add Flyway DB migration to add Feature Table API.

Signed-off-by: Terence <[email protected]>

* Rename options field to options_json and change type to text.

* Options to be stored as Protobuf JSON.
* Change from varchar to text to remove char limit

Signed-off-by: Terence <[email protected]>

* FeatureTable: Rename entity_names to entities

Signed-off-by: Terence <[email protected]>

* Revert Reorganise existing protos in CoreService by type as it make it hard for reviewers to review changes

Signed-off-by: Terence <[email protected]>

* Add FeatureSource entity for native representation of FeatureSource protobuf

Signed-off-by: Terence <[email protected]>

* Add missing nullable annotation on FeatureSource entity.

Signed-off-by: Terence <[email protected]>

* Update ListFeatureTablesRequest's Filter to follow naming convention.

Signed-off-by: Terence <[email protected]>

* Add missing serialization code for FeatureSource's field mapping.

Signed-off-by: Terence <[email protected]>

* Split Feature proto from FeatureTable proto.

Signed-off-by: Terence <[email protected]>

* Update FeatureTable entity_names field to entities

Signed-off-by: Terence <[email protected]>

* Revert putting project in feature table spec

Signed-off-by: Terence <[email protected]>

* Update ListFeatureTable Proto to return full FeatureTable objects and limit to listing from one Project.

Signed-off-by: Terence <[email protected]>

* Fix typo in CoreService proto

Signed-off-by: Terence <[email protected]>

* Add FeatureV2 core model to store FeatureSpecV2

Signed-off-by: Terence <[email protected]>

* Add FeatureTable core model to store FeatureTable protos

Signed-off-by: Terence <[email protected]>

* Fix naming grammar in CoreService proto

Signed-off-by: Terence <[email protected]>

* Standardise naming of specifying projects in CoreService proto

Signed-off-by: Terence <[email protected]>

* Rename FeatureSource proto to FeatureSourceSpec for compatiblity.

Signed-off-by: Terence <[email protected]>

* Update FeatureSource model to store type specific options as seperate columns instead of JSON.

Signed-off-by: Terence <[email protected]>

* Add FeatureTableTest unit test to test FeatureTable core model

Signed-off-by: Terence <[email protected]>

* Add FeatureTableValidator to validate FeatureTableSpec protobufs

Signed-off-by: Terence <[email protected]>

* Add listFeatureTables(), applyFeatureTable() & getFeatureTable() to Core's SpecService

Signed-off-by: Terence <[email protected]>

* Add FeatureTableRepository to save & retrieve FeatureTables in database.

Signed-off-by: Terence <[email protected]>

* Fix hibernate errors on Feast Core boot.

Signed-off-by: Terence <[email protected]>

* Implement listFeatureTables() , applyFeatureTable(), and getFeatureTable() in CoreServiceImpl

Signed-off-by: Terence <[email protected]>

* Add applyFeatureSet integration tests SpecServiceIT

Signed-off-by: Terence <[email protected]>

* Various fixes for creating FeatureTabes with applyFeatureTable

Signed-off-by: Terence <[email protected]>

* Fixed bug with updating FeatureTable

Signed-off-by: Terence <[email protected]>

* Update ListFeatureTables

Signed-off-by: Terence <[email protected]>

* Add Python SDK

Signed-off-by: Terence <[email protected]>

* Update GetFeatureTable

Signed-off-by: Terence <[email protected]>

* Remove unused proto imports and generate go protos

Signed-off-by: Terence <[email protected]>

* Fix ListFeatureTables IT

Signed-off-by: Terence <[email protected]>

* Update comment to generalize FeatureSource's field mapping to all fields instead of just for feature.

Signed-off-by: Terence <[email protected]>

* Fix feature table validator condition

Signed-off-by: Terence <[email protected]>

* Fix feature table unit tests

Signed-off-by: Terence <[email protected]>

* Update feature source proto

Signed-off-by: Terence <[email protected]>

* Address PR comments

Signed-off-by: Terence <[email protected]>

* Replace test with IT

Signed-off-by: Terence <[email protected]>

* Update IT config

Signed-off-by: Terence <[email protected]>

* Fix removal of entity check

Signed-off-by: Terence <[email protected]>

* Fix test sort issue

Signed-off-by: Terence <[email protected]>

* Store source options as json

Signed-off-by: Terence <[email protected]>

* Update go protos

Signed-off-by: Terence <[email protected]>

* Remove go FeatureSource proto

Signed-off-by: Terence <[email protected]>

* Increase IT max pool size

Signed-off-by: Terence <[email protected]>

* Reduce pool size instead

Signed-off-by: Terence <[email protected]>

* Replace mutablemapping with dict

Signed-off-by: Terence <[email protected]>

* Standardize use of timestamp_column instead of ts_column

Signed-off-by: Terence <[email protected]>

Co-authored-by: Terence Lim <[email protected]>
  • Loading branch information
mrzzy and terryyylim authored Oct 2, 2020
1 parent 87ee594 commit 442ca5a
Show file tree
Hide file tree
Showing 35 changed files with 5,913 additions and 476 deletions.
78 changes: 78 additions & 0 deletions common-test/src/main/java/feast/common/it/DataGenerator.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,16 @@
package feast.common.it;

import com.google.common.collect.ImmutableList;
import com.google.protobuf.Duration;
import feast.proto.core.DataSourceProto.DataSource;
import feast.proto.core.DataSourceProto.DataSource.BigQueryOptions;
import feast.proto.core.DataSourceProto.DataSource.FileOptions;
import feast.proto.core.DataSourceProto.DataSource.KafkaOptions;
import feast.proto.core.EntityProto;
import feast.proto.core.FeatureProto;
import feast.proto.core.FeatureProto.FeatureSpecV2;
import feast.proto.core.FeatureSetProto;
import feast.proto.core.FeatureTableProto.FeatureTableSpec;
import feast.proto.core.SourceProto;
import feast.proto.core.StoreProto;
import feast.proto.types.ValueProto;
Expand Down Expand Up @@ -130,6 +138,15 @@ public static EntityProto.EntitySpecV2 createEntitySpecV2(
.build();
}

public static FeatureProto.FeatureSpecV2 createFeatureSpecV2(
String name, ValueProto.ValueType.Enum valueType, Map<String, String> labels) {
return FeatureProto.FeatureSpecV2.newBuilder()
.setName(name)
.setValueType(valueType)
.putAllLabels(labels)
.build();
}

public static FeatureSetProto.FeatureSet createFeatureSet(
SourceProto.Source source,
String projectName,
Expand Down Expand Up @@ -193,4 +210,65 @@ public static FeatureSetProto.FeatureSet createFeatureSet(
return createFeatureSet(
source, projectName, name, Collections.emptyMap(), Collections.emptyMap());
}

// Create a Feature Table spec without DataSources configured.
public static FeatureTableSpec createFeatureTableSpec(
String name,
List<String> entities,
Map<String, ValueProto.ValueType.Enum> features,
int maxAgeSecs,
Map<String, String> labels) {

return FeatureTableSpec.newBuilder()
.setName(name)
.addAllEntities(entities)
.addAllFeatures(
features.entrySet().stream()
.map(
entry ->
FeatureSpecV2.newBuilder()
.setName(entry.getKey())
.setValueType(entry.getValue())
.putAllLabels(labels)
.build())
.collect(Collectors.toList()))
.setMaxAge(Duration.newBuilder().setSeconds(3600).build())
.putAllLabels(labels)
.build();
}

public static DataSource createFileDataSourceSpec(
String fileURL, String fileFormat, String timestampColumn, String datePartitionColumn) {
return DataSource.newBuilder()
.setType(DataSource.SourceType.BATCH_FILE)
.setFileOptions(
FileOptions.newBuilder().setFileFormat(fileFormat).setFileUrl(fileURL).build())
.setTimestampColumn(timestampColumn)
.setDatePartitionColumn(datePartitionColumn)
.build();
}

public static DataSource createBigQueryDataSourceSpec(
String bigQueryTableRef, String timestampColumn, String datePartitionColumn) {
return DataSource.newBuilder()
.setType(DataSource.SourceType.BATCH_BIGQUERY)
.setBigqueryOptions(BigQueryOptions.newBuilder().setTableRef(bigQueryTableRef).build())
.setTimestampColumn(timestampColumn)
.setDatePartitionColumn(datePartitionColumn)
.build();
}

public static DataSource createKafkaDataSourceSpec(
String servers, String topic, String classPath, String timestampColumn) {
return DataSource.newBuilder()
.setType(DataSource.SourceType.STREAM_KAFKA)
.setKafkaOptions(
KafkaOptions.newBuilder()
.setTopic(topic)
.setBootstrapServers(servers)
.setClassPath(classPath)
.build())
.setTimestampColumn(timestampColumn)
.build();
}
}
28 changes: 28 additions & 0 deletions common-test/src/main/java/feast/common/it/SimpleCoreClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package feast.common.it;

import feast.proto.core.*;
import feast.proto.core.CoreServiceProto.ApplyFeatureTableRequest;
import feast.proto.core.FeatureTableProto.FeatureTableSpec;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -75,6 +77,13 @@ public List<EntityProto.Entity> simpleListEntities(
.getEntitiesList();
}

public List<FeatureTableProto.FeatureTable> simpleListFeatureTables(
CoreServiceProto.ListFeatureTablesRequest.Filter filter) {
return stub.listFeatureTables(
CoreServiceProto.ListFeatureTablesRequest.newBuilder().setFilter(filter).build())
.getTablesList();
}

public List<FeatureSetProto.FeatureSet> simpleListFeatureSets(
String projectName, String featureSetName, Map<String, String> labels) {
return stub.listFeatureSets(
Expand Down Expand Up @@ -131,6 +140,15 @@ public EntityProto.Entity simpleGetEntity(String projectName, String name) {
.getEntity();
}

public FeatureTableProto.FeatureTable simpleGetFeatureTable(String projectName, String name) {
return stub.getFeatureTable(
CoreServiceProto.GetFeatureTableRequest.newBuilder()
.setName(name)
.setProject(projectName)
.build())
.getTable();
}

public void updateFeatureSetStatus(
String projectName, String name, FeatureSetProto.FeatureSetStatus status) {
stub.updateFeatureSetStatus(
Expand Down Expand Up @@ -190,4 +208,14 @@ public FeatureSetProto.FeatureSet getFeatureSet(String projectName, String featu
.build())
.getFeatureSet();
}

public FeatureTableProto.FeatureTable applyFeatureTable(
String projectName, FeatureTableSpec spec) {
return stub.applyFeatureTable(
ApplyFeatureTableRequest.newBuilder()
.setProject(projectName)
.setTableSpec(spec)
.build())
.getTable();
}
}
34 changes: 34 additions & 0 deletions common-test/src/main/java/feast/common/util/TestUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@

import feast.common.logging.AuditLogger;
import feast.common.logging.config.LoggingProperties;
import feast.proto.core.FeatureProto.FeatureSpecV2;
import feast.proto.core.FeatureTableProto.FeatureTableSpec;
import java.util.Comparator;
import java.util.stream.Collectors;
import org.springframework.boot.info.BuildProperties;

public class TestUtil {
Expand All @@ -37,4 +41,34 @@ public static void setupAuditLogger() {

new AuditLogger(loggingProperties, buildProperties);
}

/**
* Compare if two Feature Table specs are equal. Disregards order of features/entities in spec.
*/
public static boolean compareFeatureTableSpec(FeatureTableSpec spec, FeatureTableSpec otherSpec) {
spec =
spec.toBuilder()
.clearFeatures()
.addAllFeatures(
spec.getFeaturesList().stream()
.sorted(Comparator.comparing(FeatureSpecV2::getName))
.collect(Collectors.toSet()))
.clearEntities()
.addAllEntities(spec.getEntitiesList().stream().sorted().collect(Collectors.toSet()))
.build();

otherSpec =
otherSpec
.toBuilder()
.clearFeatures()
.addAllFeatures(
spec.getFeaturesList().stream()
.sorted(Comparator.comparing(FeatureSpecV2::getName))
.collect(Collectors.toSet()))
.clearEntities()
.addAllEntities(spec.getEntitiesList().stream().sorted().collect(Collectors.toSet()))
.build();

return spec.equals(otherSpec);
}
}
31 changes: 31 additions & 0 deletions core/src/main/java/feast/core/dao/FeatureTableRepository.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.core.dao;

import feast.core.model.FeatureTable;
import java.util.List;
import java.util.Optional;
import org.springframework.data.jpa.repository.JpaRepository;

/** JPA repository for querying FeatureTables stored. */
public interface FeatureTableRepository extends JpaRepository<FeatureTable, Long> {
// Find single FeatureTable by project and name
Optional<FeatureTable> findFeatureTableByNameAndProject_Name(String name, String projectName);

// Find FeatureTables by project
List<FeatureTable> findAllByProject_Name(String projectName);
}
104 changes: 102 additions & 2 deletions core/src/main/java/feast/core/grpc/CoreServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.server.service.GrpcService;
Expand Down Expand Up @@ -285,8 +286,8 @@ public void applyFeatureSet(
String projectId = null;

try {
FeatureSet featureSet = specService.imputeProjectName(request.getFeatureSet());
projectId = featureSet.getSpec().getProject();
FeatureSet featureSet = request.getFeatureSet();
projectId = SpecService.resolveProjectName(featureSet.getSpec().getProject());
authorizationService.authorizeRequest(SecurityContextHolder.getContext(), projectId);
ApplyFeatureSetResponse response = specService.applyFeatureSet(featureSet);
responseObserver.onNext(response);
Expand Down Expand Up @@ -391,4 +392,103 @@ public void listProjects(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
}
}

@Override
public void applyFeatureTable(
ApplyFeatureTableRequest request,
StreamObserver<ApplyFeatureTableResponse> responseObserver) {
String projectName = SpecService.resolveProjectName(request.getProject());
String tableName = request.getTableSpec().getName();

try {
// Check if user has authorization to apply feature table
authorizationService.authorizeRequest(SecurityContextHolder.getContext(), projectName);

ApplyFeatureTableResponse response = specService.applyFeatureTable(request);
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (AccessDeniedException e) {
log.info(
String.format(
"ApplyFeatureTable: Not authorized to access project to apply: %s", projectName));
responseObserver.onError(
Status.PERMISSION_DENIED
.withDescription(e.getMessage())
.withCause(e)
.asRuntimeException());
} catch (org.hibernate.exception.ConstraintViolationException e) {
log.error(
String.format(
"ApplyFeatureTable: Unable to apply Feature Table due to a conflict: "
+ "Ensure that name is unique within Project: (name: %s, project: %s)",
projectName, tableName));
responseObserver.onError(
Status.ALREADY_EXISTS.withDescription(e.getMessage()).withCause(e).asRuntimeException());
} catch (IllegalArgumentException e) {
log.error(
String.format(
"ApplyFeatureTable: Invalid apply Feature Table Request: (name: %s, project: %s)",
projectName, tableName));
responseObserver.onError(
Status.INVALID_ARGUMENT
.withDescription(e.getMessage())
.withCause(e)
.asRuntimeException());
} catch (UnsupportedOperationException e) {
log.error(
String.format(
"ApplyFeatureTable: Unsupported apply Feature Table Request: (name: %s, project: %s)",
projectName, tableName));
responseObserver.onError(
Status.UNIMPLEMENTED.withDescription(e.getMessage()).withCause(e).asRuntimeException());
} catch (Exception e) {
log.error("ApplyFeatureTable Exception has occurred:", e);
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
}
}

@Override
public void listFeatureTables(
ListFeatureTablesRequest request,
StreamObserver<ListFeatureTablesResponse> responseObserver) {
try {
ListFeatureTablesResponse response = specService.listFeatureTables(request.getFilter());
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (IllegalArgumentException e) {
log.error(String.format("ListFeatureTable: Invalid list Feature Table Request"));
responseObserver.onError(
Status.INVALID_ARGUMENT
.withDescription(e.getMessage())
.withCause(e)
.asRuntimeException());
} catch (Exception e) {
log.error("ListFeatureTable: Exception has occurred: ", e);
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
}
}

@Override
public void getFeatureTable(
GetFeatureTableRequest request, StreamObserver<GetFeatureTableResponse> responseObserver) {
try {
GetFeatureTableResponse response = specService.getFeatureTable(request);

responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (NoSuchElementException e) {
log.error(
String.format(
"GetFeatureTable: No such Feature Table: (project: %s, name: %s)",
request.getProject(), request.getName()));
responseObserver.onError(
Status.NOT_FOUND.withDescription(e.getMessage()).withCause(e).asRuntimeException());
} catch (Exception e) {
log.error("GetFeatureTable: Exception has occurred: ", e);
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
}
}
}
Loading

0 comments on commit 442ca5a

Please sign in to comment.