From 258d15735b377d418ee06b6997ec273a56e5b2cf Mon Sep 17 00:00:00 2001 From: medgar Date: Fri, 30 Jul 2021 14:08:32 +1000 Subject: [PATCH] fix: add child doc deletion --- contracts | 2 +- pkg/plugins/document/boltdb/boltdb.go | 35 +++++++- pkg/plugins/document/dynamodb/dynamodb.go | 89 ++++++++++++++++++++- pkg/plugins/document/firestore/firestore.go | 46 +++++++++-- pkg/plugins/gateway/app_platform/http.go | 2 +- pkg/providers/azure/plugin.go | 9 +-- pluggable_membrane.go | 25 +++--- tests/plugins/document/suite.go | 23 +++++- 8 files changed, 199 insertions(+), 32 deletions(-) diff --git a/contracts b/contracts index 795f11fa1..43e281ab9 160000 --- a/contracts +++ b/contracts @@ -1 +1 @@ -Subproject commit 795f11fa13aa853eff9e09e2b0c97a255acc9180 +Subproject commit 43e281ab93df628091db864580d6a1dbd6d2d0f8 diff --git a/pkg/plugins/document/boltdb/boltdb.go b/pkg/plugins/document/boltdb/boltdb.go index 5a5045e05..5194379a3 100644 --- a/pkg/plugins/document/boltdb/boltdb.go +++ b/pkg/plugins/document/boltdb/boltdb.go @@ -113,10 +113,26 @@ func (s *BoltDocService) Delete(key *document.Key) error { doc := createDoc(key) err = db.DeleteStruct(&doc) + if err != nil { + return err + } + + // Delete sub collection documents + if key.Collection.Parent == nil { + childDocs, err := fetchChildDocs(key, db) + if err != nil { + return err + } - // TODO: delete sub collection records + for _, childDoc := range childDocs { + err = db.DeleteStruct(&childDoc) + if err != nil { + return err + } + } + } - return err + return nil } func (s *BoltDocService) Query(collection *document.Collection, expressions []document.QueryExpression, limit int, pagingToken map[string]string) (*document.QueryResult, error) { @@ -321,3 +337,18 @@ func toSdkDoc(col *document.Collection, doc BoltDoc) *document.Document { }, } } + +func fetchChildDocs(key *document.Key, db *storm.DB) ([]BoltDoc, error) { + var childDocs []BoltDoc + + err := db.Find(partionKeyName, key.Id, &childDocs) + if err != nil { + if err.Error() == "not found" { + return childDocs, nil + } else { + return nil, err + } + } + + return childDocs, nil +} diff --git a/pkg/plugins/document/dynamodb/dynamodb.go b/pkg/plugins/document/dynamodb/dynamodb.go index 67636279e..3671eaeae 100644 --- a/pkg/plugins/document/dynamodb/dynamodb.go +++ b/pkg/plugins/document/dynamodb/dynamodb.go @@ -31,6 +31,8 @@ import ( const ATTRIB_PK = "_pk" const ATTRIB_SK = "_sk" +const deleteQueryLimit = int64(1000) +const maxBatchWrite = 25 // DynamoDocService - AWS DynamoDB AWS Nitric Document service type DynamoDocService struct { @@ -121,17 +123,37 @@ func (s *DynamoDocService) Delete(key *document.Key) error { return fmt.Errorf("failed to marshal keys: %v", key) } - input := &dynamodb.DeleteItemInput{ + deleteInput := &dynamodb.DeleteItemInput{ Key: attributeMap, TableName: getTableName(*key.Collection), } - _, err = s.client.DeleteItem(input) + _, err = s.client.DeleteItem(deleteInput) if err != nil { return fmt.Errorf("error deleting %v item %v : %v", key.Collection, key.Id, err) } - // TODO: delete sub collection records + // Delete sub collection items + if key.Collection.Parent == nil { + + var lastEvaluatedKey map[string]*dynamodb.AttributeValue + for { + queryInput := createDeleteQuery(key, lastEvaluatedKey) + resp, err := s.client.Query(queryInput) + if err != nil { + return fmt.Errorf("error performing delete: %v", err) + } + + err = s.processDeleteQuery(key, resp) + if err != nil { + return fmt.Errorf("error performing delete: %v", err) + } + + if len(lastEvaluatedKey) == 0 { + break + } + } + } return nil } @@ -542,3 +564,64 @@ func getTableName(collection document.Collection) *string { return aws.String(coll.Name) } + +func createDeleteQuery(key *document.Key, startKey map[string]*dynamodb.AttributeValue) *dynamodb.QueryInput { + limit := int64(deleteQueryLimit) + + return &dynamodb.QueryInput{ + TableName: getTableName(*key.Collection), + Limit: &(limit), + Select: aws.String(dynamodb.SelectSpecificAttributes), + ProjectionExpression: aws.String("#pk, #sk"), + KeyConditionExpression: aws.String("#pk = :pk"), + ExpressionAttributeNames: map[string]*string{ + "#pk": aws.String("_pk"), + "#sk": aws.String("_sk"), + }, + ExpressionAttributeValues: map[string]*dynamodb.AttributeValue{ + ":pk": { + S: aws.String(key.Id), + }, + }, + ExclusiveStartKey: startKey, + } +} + +func (s *DynamoDocService) processDeleteQuery(key *document.Key, resp *dynamodb.QueryOutput) error { + + itemIndex := 0 + for itemIndex < len(resp.Items) { + + batchInput := &dynamodb.BatchWriteItemInput{} + batchInput.RequestItems = make(map[string][]*dynamodb.WriteRequest) + writeRequests := make([]*dynamodb.WriteRequest, 0, maxBatchWrite) + + batchCount := 0 + for batchCount < maxBatchWrite && itemIndex < len(resp.Items) { + item := resp.Items[itemIndex] + itemIndex += 1 + + writeRequest := dynamodb.WriteRequest{} + + writeRequest.DeleteRequest = &dynamodb.DeleteRequest{ + Key: map[string]*dynamodb.AttributeValue{ + ATTRIB_PK: item[ATTRIB_PK], + ATTRIB_SK: item[ATTRIB_SK], + }, + } + writeRequests = append(writeRequests, &writeRequest) + + batchCount += 1 + } + + batchInput.RequestItems = make(map[string][]*dynamodb.WriteRequest) + batchInput.RequestItems[key.Collection.Name] = writeRequests + + _, err := s.client.BatchWriteItem(batchInput) + if err != nil { + return err + } + } + + return nil +} diff --git a/pkg/plugins/document/firestore/firestore.go b/pkg/plugins/document/firestore/firestore.go index 91e149a60..6b8149c98 100644 --- a/pkg/plugins/document/firestore/firestore.go +++ b/pkg/plugins/document/firestore/firestore.go @@ -82,13 +82,48 @@ func (s *FirestoreDocService) Delete(key *document.Key) error { doc := s.getDocRef(key) + // Delete any sub collection documents + collsIter := doc.Collections(s.context) + for subCol, err := collsIter.Next(); err != iterator.Done; subCol, err = collsIter.Next() { + if err != nil { + return fmt.Errorf("error deleting value: %v", err) + } + + // Loop over sub collection documents, performing batch deletes + // up to Firestore's maximum batch size + const maxBatchSize = 500 + for { + docsIter := subCol.Limit(maxBatchSize).Documents(s.context) + numDeleted := 0 + + batch := s.client.Batch() + for subDoc, err := docsIter.Next(); err != iterator.Done; subDoc, err = docsIter.Next() { + if err != nil { + return err + } + + batch.Delete(subDoc.Ref) + numDeleted++ + } + + // If no more to delete, completed + if numDeleted == 0 { + break + } + + _, err := batch.Commit(s.context) + if err != nil { + return err + } + } + } + + // Delete document _, err = doc.Delete(s.context) if err != nil { return fmt.Errorf("error deleting value: %v", err) } - // TODO: delete sub collection records - return nil } @@ -147,12 +182,7 @@ func (s *FirestoreDocService) Query(collection *document.Collection, expressions } itr := query.Documents(s.context) - - for { - docSnp, err := itr.Next() - if err == iterator.Done { - break - } + for docSnp, err := itr.Next(); err != iterator.Done; docSnp, err = itr.Next() { if err != nil { return nil, fmt.Errorf("error querying value: %v", err) } diff --git a/pkg/plugins/gateway/app_platform/http.go b/pkg/plugins/gateway/app_platform/http.go index a55188cba..60e8cdba4 100644 --- a/pkg/plugins/gateway/app_platform/http.go +++ b/pkg/plugins/gateway/app_platform/http.go @@ -29,7 +29,7 @@ import ( type HttpGateway struct { address string server *fasthttp.Server - sdk.UnimplementedGatewayPlugin + gateway.UnimplementedGatewayPlugin } func httpHandler(pool worker.WorkerPool) func(ctx *fasthttp.RequestCtx) { diff --git a/pkg/providers/azure/plugin.go b/pkg/providers/azure/plugin.go index 07fe80302..1614053fd 100644 --- a/pkg/providers/azure/plugin.go +++ b/pkg/providers/azure/plugin.go @@ -22,7 +22,6 @@ import ( "github.com/nitric-dev/membrane/pkg/plugins/queue" "github.com/nitric-dev/membrane/pkg/plugins/storage" "github.com/nitric-dev/membrane/pkg/providers" - "github.com/nitric-dev/membrane/pkg/sdk" ) type AzureServiceFactory struct { @@ -34,12 +33,12 @@ func New() providers.ServiceFactory { // NewDocumentService - Returns Azure _ based document plugin func (p *AzureServiceFactory) NewDocumentService() (document.DocumentService, error) { - return &sdk.UnimplementedDocumentPlugin{}, nil + return &document.UnimplementedDocumentPlugin{}, nil } // NewEventService - Returns Azure _ based events plugin func (p *AzureServiceFactory) NewEventService() (events.EventService, error) { - return &sdk.UnimplementedeventsPlugin{}, nil + return &events.UnimplementedeventsPlugin{}, nil } // NewGatewayService - Returns Azure _ Gateway plugin @@ -49,10 +48,10 @@ func (p *AzureServiceFactory) NewGatewayService() (gateway.GatewayService, error // NewQueueService - Returns Azure _ based queue plugin func (p *AzureServiceFactory) NewQueueService() (queue.QueueService, error) { - return &sdk.UnimplementedQueuePlugin{}, nil + return &queue.UnimplementedQueuePlugin{}, nil } // NewStorageService - Returns Azure _ based storage plugin func (p *AzureServiceFactory) NewStorageService() (storage.StorageService, error) { - return &sdk.UnimplementedStoragePlugin{}, nil + return &storage.UnimplementedStoragePlugin{}, nil } diff --git a/pluggable_membrane.go b/pluggable_membrane.go index c74658363..bbd67afa7 100644 --- a/pluggable_membrane.go +++ b/pluggable_membrane.go @@ -23,7 +23,12 @@ import ( "strings" "github.com/nitric-dev/membrane/pkg/membrane" - "github.com/nitric-dev/membrane/pkg/sdk" + "github.com/nitric-dev/membrane/pkg/plugins/document" + "github.com/nitric-dev/membrane/pkg/plugins/events" + "github.com/nitric-dev/membrane/pkg/plugins/gateway" + "github.com/nitric-dev/membrane/pkg/plugins/queue" + "github.com/nitric-dev/membrane/pkg/plugins/storage" + "github.com/nitric-dev/membrane/pkg/providers" "github.com/nitric-dev/membrane/pkg/utils" ) @@ -53,12 +58,12 @@ func main() { log.Println(fmt.Sprintf("failed to parse TOLERATE_MISSING_SERVICES environment variable with value [%s], defaulting to false", tolerateMissingServices)) tolerateMissing = false } - var serviceFactory sdk.ServiceFactory = nil + var serviceFactory providers.ServiceFactory = nil // Load the Plugin Factory if plug, err := plugin.Open(fmt.Sprintf("%s/%s", pluginDir, serviceFactoryPluginFile)); err == nil { if symbol, err := plug.Lookup("New"); err == nil { - if newFunc, ok := symbol.(func() (sdk.ServiceFactory, error)); ok { + if newFunc, ok := symbol.(func() (providers.ServiceFactory, error)); ok { if serviceFactoryPlugin, err := newFunc(); err == nil { serviceFactory = serviceFactoryPlugin } @@ -70,18 +75,18 @@ func main() { } // Load the concrete service implementations - var documentService sdk.DocumentService = nil - var eventingService sdk.EventService = nil - var gatewayService sdk.GatewayService = nil - var queueService sdk.QueueService = nil - var storageService sdk.StorageService = nil + var documentService document.DocumentService = nil + var eventService events.EventService = nil + var gatewayService gateway.GatewayService = nil + var queueService queue.QueueService = nil + var storageService storage.StorageService = nil // Load the document service if documentService, err = serviceFactory.NewDocumentService(); err != nil { log.Fatal(err) } // Load the eventing service - if eventingService, err = serviceFactory.NewEventService(); err != nil { + if eventService, err = serviceFactory.NewEventService(); err != nil { log.Fatal(err) } // Load the gateway service @@ -103,7 +108,7 @@ func main() { ChildAddress: childAddress, ChildCommand: childCommand, DocumentPlugin: documentService, - EventingPlugin: eventingService, + EventsPlugin: eventService, StoragePlugin: storageService, GatewayPlugin: gatewayService, QueuePlugin: queueService, diff --git a/tests/plugins/document/suite.go b/tests/plugins/document/suite.go index 6cf245906..48748b7c1 100644 --- a/tests/plugins/document/suite.go +++ b/tests/plugins/document/suite.go @@ -455,9 +455,28 @@ func DeleteTests(docPlugin document.DocumentService) { It("Should delete all children", func() { LoadCustomersData(docPlugin) - err := docPlugin.Delete(&Customer1.Key) + col := document.Collection{ + Name: "orders", + Parent: &document.Key{ + Collection: &document.Collection{ + Name: "customers", + }, + }, + } + + result, err := docPlugin.Query(&col, []document.QueryExpression{}, 0, nil) + Expect(err).To(BeNil()) + Expect(result.Documents).To(HaveLen(5)) + + err = docPlugin.Delete(&Customer1.Key) + Expect(err).ShouldNot(HaveOccurred()) + + err = docPlugin.Delete(&Customer2.Key) Expect(err).ShouldNot(HaveOccurred()) - // TODO: ensure Customer1.Orders are deleted + + result, err = docPlugin.Query(&col, []document.QueryExpression{}, 0, nil) + Expect(err).To(BeNil()) + Expect(result.Documents).To(HaveLen(0)) }) }) })