From ccf4e9a5bc72d2ee8cf9e6a3fa29c57398885660 Mon Sep 17 00:00:00 2001 From: Terence Date: Fri, 30 Oct 2020 10:07:02 +0800 Subject: [PATCH 1/3] Add feature table deletion Signed-off-by: Terence --- .../feast/common/it/SimpleCoreClient.java | 8 +++ .../java/feast/core/grpc/CoreServiceImpl.java | 26 ++++++++++ .../java/feast/core/model/FeatureTable.java | 37 ++++++++++++++ .../java/feast/core/service/SpecService.java | 28 +++++++++++ .../V3.0__Feature_Table_Deletion.sql | 3 ++ .../feast/core/service/SpecServiceIT.java | 49 +++++++++++++++++++ protos/feast/core/CoreService.proto | 14 ++++++ protos/feast/core/FeatureTable.proto | 4 ++ sdk/python/feast/client.py | 21 ++++++++ 9 files changed, 190 insertions(+) create mode 100644 core/src/main/resources/db/migration/V3.0__Feature_Table_Deletion.sql diff --git a/common-test/src/main/java/feast/common/it/SimpleCoreClient.java b/common-test/src/main/java/feast/common/it/SimpleCoreClient.java index b54dbfc857..cffb4f42af 100644 --- a/common-test/src/main/java/feast/common/it/SimpleCoreClient.java +++ b/common-test/src/main/java/feast/common/it/SimpleCoreClient.java @@ -218,4 +218,12 @@ public FeatureTableProto.FeatureTable applyFeatureTable( .build()) .getTable(); } + + public void deleteFeatureTable(String projectName, String featureTableName) { + stub.deleteFeatureTable( + CoreServiceProto.DeleteFeatureTableRequest.newBuilder() + .setProject(projectName) + .setName(featureTableName) + .build()); + } } diff --git a/core/src/main/java/feast/core/grpc/CoreServiceImpl.java b/core/src/main/java/feast/core/grpc/CoreServiceImpl.java index ef7218d13f..5193bc63b2 100644 --- a/core/src/main/java/feast/core/grpc/CoreServiceImpl.java +++ b/core/src/main/java/feast/core/grpc/CoreServiceImpl.java @@ -491,4 +491,30 @@ public void getFeatureTable( Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException()); } } + + @Override + public void deleteFeatureTable( + DeleteFeatureTableRequest request, + StreamObserver responseObserver) { + String projectName = request.getProject(); + try { + // Check if user has authorization to delete feature table + authorizationService.authorizeRequest(SecurityContextHolder.getContext(), projectName); + specService.deleteFeatureTable(request); + + responseObserver.onNext(DeleteFeatureTableResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } catch (NoSuchElementException e) { + log.error( + String.format( + "DeleteFeatureTable: No such Feature Table: (project: %s, name: %s)", + request.getProject(), request.getName())); + responseObserver.onError( + Status.NOT_FOUND.withDescription(e.getMessage()).withCause(e).asRuntimeException()); + } catch (Exception e) { + log.error("DeleteFeatureTable: Exception has occurred: ", e); + responseObserver.onError( + Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException()); + } + } } diff --git a/core/src/main/java/feast/core/model/FeatureTable.java b/core/src/main/java/feast/core/model/FeatureTable.java index 90cd79e6cf..14decd7b46 100644 --- a/core/src/main/java/feast/core/model/FeatureTable.java +++ b/core/src/main/java/feast/core/model/FeatureTable.java @@ -16,6 +16,8 @@ */ package feast.core.model; +import com.google.common.primitives.Longs; +import com.google.protobuf.ByteString; import com.google.protobuf.Duration; import com.google.protobuf.Timestamp; import feast.core.dao.EntityRepository; @@ -107,6 +109,12 @@ public class FeatureTable extends AbstractTimestampEntity { @Column(name = "revision", nullable = false) private int revision; + @Column(name = "is_deleted", nullable = false) + private boolean isDeleted; + + @Column(name = "metadata_hash", nullable = false) + private long metadataHash; + public FeatureTable() {}; /** @@ -147,6 +155,13 @@ public static FeatureTable fromProto( table.setStreamSource(DataSource.fromProto(spec.getStreamSource())); } + table.setMetadataHash( + Objects.hash( + table.getEntities(), + table.getFeatures(), + table.getBatchSource(), + table.getStreamSource())); + return table; } @@ -202,6 +217,14 @@ public void updateFromProto( this.streamSource = null; } + // Update hash + this.setMetadataHash( + Objects.hash( + this.getEntities(), this.getFeatures(), this.getBatchSource(), this.getStreamSource())); + + // Set isDeleted to false + this.setDeleted(false); + // Bump revision no. this.revision++; } @@ -211,6 +234,14 @@ public FeatureTableProto.FeatureTable toProto() { // Convert field types to Protobuf compatible types Timestamp creationTime = TypeConversion.convertTimestamp(getCreated()); Timestamp updatedTime = TypeConversion.convertTimestamp(getLastUpdated()); + ByteString metadataHashBytes = + ByteString.copyFrom( + Longs.toByteArray( + Objects.hash( + this.getEntities(), + this.getFeatures(), + this.getBatchSource(), + this.getStreamSource()))); List featureSpecs = getFeatures().stream().map(FeatureV2::toProto).collect(Collectors.toList()); @@ -236,6 +267,7 @@ public FeatureTableProto.FeatureTable toProto() { .setRevision(getRevision()) .setCreatedTimestamp(creationTime) .setLastUpdatedTimestamp(updatedTime) + .setHash(metadataHashBytes) .build()) .setSpec(spec.build()) .build(); @@ -279,6 +311,11 @@ public Map getLabelsMap() { return TypeConversion.convertJsonStringToMap(getLabelsJSON()); } + public void delete() { + this.setDeleted(true); + this.setRevision(0); + } + @Override public int hashCode() { return Objects.hash( diff --git a/core/src/main/java/feast/core/service/SpecService.java b/core/src/main/java/feast/core/service/SpecService.java index 0db898d12e..9f6569fb82 100644 --- a/core/src/main/java/feast/core/service/SpecService.java +++ b/core/src/main/java/feast/core/service/SpecService.java @@ -37,6 +37,7 @@ import feast.proto.core.CoreServiceProto.ApplyFeatureSetResponse.Status; import feast.proto.core.CoreServiceProto.ApplyFeatureTableRequest; import feast.proto.core.CoreServiceProto.ApplyFeatureTableResponse; +import feast.proto.core.CoreServiceProto.DeleteFeatureTableRequest; import feast.proto.core.CoreServiceProto.GetEntityRequest; import feast.proto.core.CoreServiceProto.GetEntityResponse; import feast.proto.core.CoreServiceProto.GetFeatureSetRequest; @@ -702,6 +703,7 @@ public ListFeatureTablesResponse listFeatureTables(ListFeatureTablesRequest.Filt matchingTables = matchingTables.stream() .filter(table -> table.hasAllLabels(labelsFilter)) + .filter(table -> !table.isDeleted()) .collect(Collectors.toList()); } for (FeatureTable table : matchingTables) { @@ -735,10 +737,36 @@ public GetFeatureTableResponse getFeatureTable(GetFeatureTableRequest request) { "No such Feature Table: (project: %s, name: %s)", projectName, featureTableName)); } + if (retrieveTable.get().isDeleted()) { + throw new NoSuchElementException( + String.format( + "Feature Table has been deleted: (project: %s, name: %s)", + projectName, featureTableName)); + } + // Build GetFeatureTableResponse GetFeatureTableResponse response = GetFeatureTableResponse.newBuilder().setTable(retrieveTable.get().toProto()).build(); return response; } + + @Transactional + public void deleteFeatureTable(DeleteFeatureTableRequest request) { + String projectName = resolveProjectName(request.getProject()); + String featureTableName = request.getName(); + + checkValidCharacters(projectName, "project"); + checkValidCharacters(featureTableName, "featureTable"); + + Optional existingTable = + tableRepository.findFeatureTableByNameAndProject_Name(featureTableName, projectName); + if (existingTable.isEmpty()) { + throw new NoSuchElementException( + String.format( + "No such Feature Table: (project: %s, name: %s)", projectName, featureTableName)); + } + + existingTable.get().delete(); + } } diff --git a/core/src/main/resources/db/migration/V3.0__Feature_Table_Deletion.sql b/core/src/main/resources/db/migration/V3.0__Feature_Table_Deletion.sql new file mode 100644 index 0000000000..1434550243 --- /dev/null +++ b/core/src/main/resources/db/migration/V3.0__Feature_Table_Deletion.sql @@ -0,0 +1,3 @@ +ALTER TABLE feature_tables ADD COLUMN is_deleted boolean NOT NULL; + +ALTER TABLE feature_tables ADD COLUMN metadata_hash bigint NOT NULL; \ No newline at end of file diff --git a/core/src/test/java/feast/core/service/SpecServiceIT.java b/core/src/test/java/feast/core/service/SpecServiceIT.java index 541580718c..1578b9775b 100644 --- a/core/src/test/java/feast/core/service/SpecServiceIT.java +++ b/core/src/test/java/feast/core/service/SpecServiceIT.java @@ -1336,4 +1336,53 @@ public void shouldErrorOnArchivedProject() { .build())); } } + + @Nested + public class DeleteFeatureTable { + + @Test + public void shouldReturnNoTables() { + String projectName = "default"; + String featureTableName = "featuretable1"; + + apiClient.deleteFeatureTable(projectName, featureTableName); + + CoreServiceProto.ListFeatureTablesRequest.Filter filter = + CoreServiceProto.ListFeatureTablesRequest.Filter.newBuilder() + .setProject("default") + .build(); + List featureTables = + apiClient.simpleListFeatureTables(filter); + + StatusRuntimeException exc = + assertThrows( + StatusRuntimeException.class, + () -> apiClient.simpleGetFeatureTable(projectName, featureTableName)); + + assertThat(featureTables.size(), equalTo(0)); + assertThat( + exc.getMessage(), + equalTo( + String.format( + "NOT_FOUND: Feature Table has been deleted: (project: %s, name: %s)", + projectName, featureTableName))); + } + + @Test + public void shouldErrorIfTableNotExist() { + String projectName = "default"; + String featureTableName = "nonexistent_table"; + StatusRuntimeException exc = + assertThrows( + StatusRuntimeException.class, + () -> apiClient.deleteFeatureTable(projectName, featureTableName)); + + assertThat( + exc.getMessage(), + equalTo( + String.format( + "NOT_FOUND: No such Feature Table: (project: %s, name: %s)", + projectName, featureTableName))); + } + } } diff --git a/protos/feast/core/CoreService.proto b/protos/feast/core/CoreService.proto index a62dc56758..5e7eb82418 100644 --- a/protos/feast/core/CoreService.proto +++ b/protos/feast/core/CoreService.proto @@ -134,6 +134,9 @@ service CoreService { // Returns a specific feature table rpc GetFeatureTable (GetFeatureTableRequest) returns (GetFeatureTableResponse); + // Delete a specific feature table + rpc DeleteFeatureTable (DeleteFeatureTableRequest) returns (DeleteFeatureTableResponse); + } service JobControllerService { @@ -500,3 +503,14 @@ message ListFeatureTablesResponse { // List of matching Feature Tables repeated FeatureTable tables = 1; } + +message DeleteFeatureTableRequest { + // Optional. Name of the Project to delete the Feature Table from. + // If unspecified, will delete FeatureTable from the default project. + string project = 1; + + // Name of the FeatureTable to delete. + string name = 2; +} + +message DeleteFeatureTableResponse {} diff --git a/protos/feast/core/FeatureTable.proto b/protos/feast/core/FeatureTable.proto index 8ddd5fab2e..6550507024 100644 --- a/protos/feast/core/FeatureTable.proto +++ b/protos/feast/core/FeatureTable.proto @@ -76,4 +76,8 @@ message FeatureTableMeta { // Auto incrementing revision no. of this Feature Table int64 revision = 3; + + // Hash entities, features, batch_source and stream_source to inform JobService if + // jobs should be restarted should hash change + bytes hash = 4; } diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index 6c57183f8a..5c15e6e022 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -51,6 +51,7 @@ ArchiveProjectResponse, CreateProjectRequest, CreateProjectResponse, + DeleteFeatureTableRequest, GetEntityRequest, GetEntityResponse, GetFeastCoreVersionRequest, @@ -679,6 +680,26 @@ def get_feature_table(self, name: str, project: str = None) -> FeatureTable: raise grpc.RpcError(e.details()) return FeatureTable.from_proto(get_feature_table_response.table) + def delete_feature_table(self, name: str, project: str = None) -> None: + """ + Deletes a feature table. + + Args: + project: Feast project that this feature table belongs to + name: Name of feature table + """ + + if project is None: + project = self.project + + try: + self._core_service.DeleteFeatureTable( + DeleteFeatureTableRequest(project=project, name=name.strip()), + metadata=self._get_grpc_metadata(), + ) + except grpc.RpcError as e: + raise grpc.RpcError(e.details()) + def ingest( self, feature_table: Union[str, FeatureTable], From cd50a3f293986c0c2e9e6324098da8711aafe1c2 Mon Sep 17 00:00:00 2001 From: Terence Date: Mon, 2 Nov 2020 11:53:23 +0800 Subject: [PATCH 2/3] Use proto hash and remove hash from db Signed-off-by: Terence --- .../java/feast/core/model/FeatureTable.java | 60 +++++++++---------- .../V3.0__Feature_Table_Deletion.sql | 4 +- protos/feast/core/FeatureTable.proto | 2 +- 3 files changed, 31 insertions(+), 35 deletions(-) diff --git a/core/src/main/java/feast/core/model/FeatureTable.java b/core/src/main/java/feast/core/model/FeatureTable.java index 14decd7b46..8c62df6486 100644 --- a/core/src/main/java/feast/core/model/FeatureTable.java +++ b/core/src/main/java/feast/core/model/FeatureTable.java @@ -16,8 +16,7 @@ */ package feast.core.model; -import com.google.common.primitives.Longs; -import com.google.protobuf.ByteString; +import com.google.common.hash.Hashing; import com.google.protobuf.Duration; import com.google.protobuf.Timestamp; import feast.core.dao.EntityRepository; @@ -26,12 +25,7 @@ import feast.proto.core.FeatureProto.FeatureSpecV2; import feast.proto.core.FeatureTableProto; import feast.proto.core.FeatureTableProto.FeatureTableSpec; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; +import java.util.*; import java.util.stream.Collectors; import javax.persistence.CascadeType; import javax.persistence.Column; @@ -112,9 +106,6 @@ public class FeatureTable extends AbstractTimestampEntity { @Column(name = "is_deleted", nullable = false) private boolean isDeleted; - @Column(name = "metadata_hash", nullable = false) - private long metadataHash; - public FeatureTable() {}; /** @@ -155,13 +146,6 @@ public static FeatureTable fromProto( table.setStreamSource(DataSource.fromProto(spec.getStreamSource())); } - table.setMetadataHash( - Objects.hash( - table.getEntities(), - table.getFeatures(), - table.getBatchSource(), - table.getStreamSource())); - return table; } @@ -217,11 +201,6 @@ public void updateFromProto( this.streamSource = null; } - // Update hash - this.setMetadataHash( - Objects.hash( - this.getEntities(), this.getFeatures(), this.getBatchSource(), this.getStreamSource())); - // Set isDeleted to false this.setDeleted(false); @@ -234,14 +213,7 @@ public FeatureTableProto.FeatureTable toProto() { // Convert field types to Protobuf compatible types Timestamp creationTime = TypeConversion.convertTimestamp(getCreated()); Timestamp updatedTime = TypeConversion.convertTimestamp(getLastUpdated()); - ByteString metadataHashBytes = - ByteString.copyFrom( - Longs.toByteArray( - Objects.hash( - this.getEntities(), - this.getFeatures(), - this.getBatchSource(), - this.getStreamSource()))); + String metadataHashBytes = this.protoHash(); List featureSpecs = getFeatures().stream().map(FeatureV2::toProto).collect(Collectors.toList()); @@ -316,6 +288,32 @@ public void delete() { this.setRevision(0); } + public String protoHash() { + List sortedEntities = + this.getEntities().stream().map(entity -> entity.getName()).collect(Collectors.toList()); + Collections.sort(sortedEntities); + + List sortedFeatures = new ArrayList(this.getFeatures()); + List sortedFeatureSpecs = + sortedFeatures.stream().map(featureV2 -> featureV2.toProto()).collect(Collectors.toList()); + sortedFeatures.sort(Comparator.comparing(FeatureV2::getName)); + + DataSourceProto.DataSource streamSource = DataSourceProto.DataSource.getDefaultInstance(); + if (getStreamSource() != null) { + streamSource = getStreamSource().toProto(); + } + + FeatureTableSpec featureTableSpec = + FeatureTableSpec.newBuilder() + .addAllEntities(sortedEntities) + .addAllFeatures(sortedFeatureSpecs) + .setBatchSource(getBatchSource().toProto()) + .setStreamSource(streamSource) + .setMaxAge(Duration.newBuilder().setSeconds(getMaxAgeSecs()).build()) + .build(); + return Hashing.murmur3_32().hashBytes(featureTableSpec.toByteArray()).toString(); + } + @Override public int hashCode() { return Objects.hash( diff --git a/core/src/main/resources/db/migration/V3.0__Feature_Table_Deletion.sql b/core/src/main/resources/db/migration/V3.0__Feature_Table_Deletion.sql index 1434550243..e915ec07d4 100644 --- a/core/src/main/resources/db/migration/V3.0__Feature_Table_Deletion.sql +++ b/core/src/main/resources/db/migration/V3.0__Feature_Table_Deletion.sql @@ -1,3 +1 @@ -ALTER TABLE feature_tables ADD COLUMN is_deleted boolean NOT NULL; - -ALTER TABLE feature_tables ADD COLUMN metadata_hash bigint NOT NULL; \ No newline at end of file +ALTER TABLE feature_tables ADD COLUMN is_deleted boolean NOT NULL; \ No newline at end of file diff --git a/protos/feast/core/FeatureTable.proto b/protos/feast/core/FeatureTable.proto index 6550507024..279072fccf 100644 --- a/protos/feast/core/FeatureTable.proto +++ b/protos/feast/core/FeatureTable.proto @@ -79,5 +79,5 @@ message FeatureTableMeta { // Hash entities, features, batch_source and stream_source to inform JobService if // jobs should be restarted should hash change - bytes hash = 4; + string hash = 4; } From c3d389ed9f753d04f47741f22b35bc851b6e9d73 Mon Sep 17 00:00:00 2001 From: Terence Date: Mon, 2 Nov 2020 16:51:42 +0800 Subject: [PATCH 3/3] Add test for reapply after deletion Signed-off-by: Terence --- .../feast/core/service/SpecServiceIT.java | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/core/src/test/java/feast/core/service/SpecServiceIT.java b/core/src/test/java/feast/core/service/SpecServiceIT.java index 1578b9775b..8d56de606b 100644 --- a/core/src/test/java/feast/core/service/SpecServiceIT.java +++ b/core/src/test/java/feast/core/service/SpecServiceIT.java @@ -1368,6 +1368,37 @@ public void shouldReturnNoTables() { projectName, featureTableName))); } + @Test + public void shouldUpdateDeletedTable() { + String projectName = "default"; + String featureTableName = "featuretable1"; + + apiClient.deleteFeatureTable(projectName, featureTableName); + + FeatureTableSpec featureTableSpec = + DataGenerator.createFeatureTableSpec( + featureTableName, + Arrays.asList("entity1", "entity2"), + new HashMap<>() { + { + put("feature3", ValueProto.ValueType.Enum.INT64); + } + }, + 7200, + ImmutableMap.of("feat_key3", "feat_value3")) + .toBuilder() + .setBatchSource( + DataGenerator.createFileDataSourceSpec("file:///path/to/file", "ts_col", "")) + .build(); + + apiClient.applyFeatureTable(projectName, featureTableSpec); + + FeatureTableProto.FeatureTable featureTable = + apiClient.simpleGetFeatureTable(projectName, featureTableName); + + assertTrue(TestUtil.compareFeatureTableSpec(featureTable.getSpec(), featureTableSpec)); + } + @Test public void shouldErrorIfTableNotExist() { String projectName = "default";