Skip to content

Commit

Permalink
Merge pull request #96 from nitrictech/feature/child-delete
Browse files Browse the repository at this point in the history
feat: add delete child docs
  • Loading branch information
medgar-nitric authored Aug 3, 2021
2 parents 04eb070 + 258d157 commit 11ac213
Show file tree
Hide file tree
Showing 8 changed files with 199 additions and 32 deletions.
2 changes: 1 addition & 1 deletion contracts
35 changes: 33 additions & 2 deletions pkg/plugins/document/boltdb/boltdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,26 @@ func (s *BoltDocService) Delete(key *document.Key) error {
doc := createDoc(key)

err = db.DeleteStruct(&doc)
if err != nil {
return err
}

// Delete sub collection documents
if key.Collection.Parent == nil {
childDocs, err := fetchChildDocs(key, db)
if err != nil {
return err
}

// TODO: delete sub collection records
for _, childDoc := range childDocs {
err = db.DeleteStruct(&childDoc)
if err != nil {
return err
}
}
}

return err
return nil
}

func (s *BoltDocService) Query(collection *document.Collection, expressions []document.QueryExpression, limit int, pagingToken map[string]string) (*document.QueryResult, error) {
Expand Down Expand Up @@ -321,3 +337,18 @@ func toSdkDoc(col *document.Collection, doc BoltDoc) *document.Document {
},
}
}

func fetchChildDocs(key *document.Key, db *storm.DB) ([]BoltDoc, error) {
var childDocs []BoltDoc

err := db.Find(partionKeyName, key.Id, &childDocs)
if err != nil {
if err.Error() == "not found" {
return childDocs, nil
} else {
return nil, err
}
}

return childDocs, nil
}
89 changes: 86 additions & 3 deletions pkg/plugins/document/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (

const ATTRIB_PK = "_pk"
const ATTRIB_SK = "_sk"
const deleteQueryLimit = int64(1000)
const maxBatchWrite = 25

// DynamoDocService - AWS DynamoDB AWS Nitric Document service
type DynamoDocService struct {
Expand Down Expand Up @@ -121,17 +123,37 @@ func (s *DynamoDocService) Delete(key *document.Key) error {
return fmt.Errorf("failed to marshal keys: %v", key)
}

input := &dynamodb.DeleteItemInput{
deleteInput := &dynamodb.DeleteItemInput{
Key: attributeMap,
TableName: getTableName(*key.Collection),
}

_, err = s.client.DeleteItem(input)
_, err = s.client.DeleteItem(deleteInput)
if err != nil {
return fmt.Errorf("error deleting %v item %v : %v", key.Collection, key.Id, err)
}

// TODO: delete sub collection records
// Delete sub collection items
if key.Collection.Parent == nil {

var lastEvaluatedKey map[string]*dynamodb.AttributeValue
for {
queryInput := createDeleteQuery(key, lastEvaluatedKey)
resp, err := s.client.Query(queryInput)
if err != nil {
return fmt.Errorf("error performing delete: %v", err)
}

err = s.processDeleteQuery(key, resp)
if err != nil {
return fmt.Errorf("error performing delete: %v", err)
}

if len(lastEvaluatedKey) == 0 {
break
}
}
}

return nil
}
Expand Down Expand Up @@ -542,3 +564,64 @@ func getTableName(collection document.Collection) *string {

return aws.String(coll.Name)
}

func createDeleteQuery(key *document.Key, startKey map[string]*dynamodb.AttributeValue) *dynamodb.QueryInput {
limit := int64(deleteQueryLimit)

return &dynamodb.QueryInput{
TableName: getTableName(*key.Collection),
Limit: &(limit),
Select: aws.String(dynamodb.SelectSpecificAttributes),
ProjectionExpression: aws.String("#pk, #sk"),
KeyConditionExpression: aws.String("#pk = :pk"),
ExpressionAttributeNames: map[string]*string{
"#pk": aws.String("_pk"),
"#sk": aws.String("_sk"),
},
ExpressionAttributeValues: map[string]*dynamodb.AttributeValue{
":pk": {
S: aws.String(key.Id),
},
},
ExclusiveStartKey: startKey,
}
}

func (s *DynamoDocService) processDeleteQuery(key *document.Key, resp *dynamodb.QueryOutput) error {

itemIndex := 0
for itemIndex < len(resp.Items) {

batchInput := &dynamodb.BatchWriteItemInput{}
batchInput.RequestItems = make(map[string][]*dynamodb.WriteRequest)
writeRequests := make([]*dynamodb.WriteRequest, 0, maxBatchWrite)

batchCount := 0
for batchCount < maxBatchWrite && itemIndex < len(resp.Items) {
item := resp.Items[itemIndex]
itemIndex += 1

writeRequest := dynamodb.WriteRequest{}

writeRequest.DeleteRequest = &dynamodb.DeleteRequest{
Key: map[string]*dynamodb.AttributeValue{
ATTRIB_PK: item[ATTRIB_PK],
ATTRIB_SK: item[ATTRIB_SK],
},
}
writeRequests = append(writeRequests, &writeRequest)

batchCount += 1
}

batchInput.RequestItems = make(map[string][]*dynamodb.WriteRequest)
batchInput.RequestItems[key.Collection.Name] = writeRequests

_, err := s.client.BatchWriteItem(batchInput)
if err != nil {
return err
}
}

return nil
}
46 changes: 38 additions & 8 deletions pkg/plugins/document/firestore/firestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,48 @@ func (s *FirestoreDocService) Delete(key *document.Key) error {

doc := s.getDocRef(key)

// Delete any sub collection documents
collsIter := doc.Collections(s.context)
for subCol, err := collsIter.Next(); err != iterator.Done; subCol, err = collsIter.Next() {
if err != nil {
return fmt.Errorf("error deleting value: %v", err)
}

// Loop over sub collection documents, performing batch deletes
// up to Firestore's maximum batch size
const maxBatchSize = 500
for {
docsIter := subCol.Limit(maxBatchSize).Documents(s.context)
numDeleted := 0

batch := s.client.Batch()
for subDoc, err := docsIter.Next(); err != iterator.Done; subDoc, err = docsIter.Next() {
if err != nil {
return err
}

batch.Delete(subDoc.Ref)
numDeleted++
}

// If no more to delete, completed
if numDeleted == 0 {
break
}

_, err := batch.Commit(s.context)
if err != nil {
return err
}
}
}

// Delete document
_, err = doc.Delete(s.context)
if err != nil {
return fmt.Errorf("error deleting value: %v", err)
}

// TODO: delete sub collection records

return nil
}

Expand Down Expand Up @@ -147,12 +182,7 @@ func (s *FirestoreDocService) Query(collection *document.Collection, expressions
}

itr := query.Documents(s.context)

for {
docSnp, err := itr.Next()
if err == iterator.Done {
break
}
for docSnp, err := itr.Next(); err != iterator.Done; docSnp, err = itr.Next() {
if err != nil {
return nil, fmt.Errorf("error querying value: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/plugins/gateway/app_platform/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
type HttpGateway struct {
address string
server *fasthttp.Server
sdk.UnimplementedGatewayPlugin
gateway.UnimplementedGatewayPlugin
}

func httpHandler(pool worker.WorkerPool) func(ctx *fasthttp.RequestCtx) {
Expand Down
9 changes: 4 additions & 5 deletions pkg/providers/azure/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/nitric-dev/membrane/pkg/plugins/queue"
"github.com/nitric-dev/membrane/pkg/plugins/storage"
"github.com/nitric-dev/membrane/pkg/providers"
"github.com/nitric-dev/membrane/pkg/sdk"
)

type AzureServiceFactory struct {
Expand All @@ -34,12 +33,12 @@ func New() providers.ServiceFactory {

// NewDocumentService - Returns Azure _ based document plugin
func (p *AzureServiceFactory) NewDocumentService() (document.DocumentService, error) {
return &sdk.UnimplementedDocumentPlugin{}, nil
return &document.UnimplementedDocumentPlugin{}, nil
}

// NewEventService - Returns Azure _ based events plugin
func (p *AzureServiceFactory) NewEventService() (events.EventService, error) {
return &sdk.UnimplementedeventsPlugin{}, nil
return &events.UnimplementedeventsPlugin{}, nil
}

// NewGatewayService - Returns Azure _ Gateway plugin
Expand All @@ -49,10 +48,10 @@ func (p *AzureServiceFactory) NewGatewayService() (gateway.GatewayService, error

// NewQueueService - Returns Azure _ based queue plugin
func (p *AzureServiceFactory) NewQueueService() (queue.QueueService, error) {
return &sdk.UnimplementedQueuePlugin{}, nil
return &queue.UnimplementedQueuePlugin{}, nil
}

// NewStorageService - Returns Azure _ based storage plugin
func (p *AzureServiceFactory) NewStorageService() (storage.StorageService, error) {
return &sdk.UnimplementedStoragePlugin{}, nil
return &storage.UnimplementedStoragePlugin{}, nil
}
25 changes: 15 additions & 10 deletions pluggable_membrane.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@ import (
"strings"

"github.com/nitric-dev/membrane/pkg/membrane"
"github.com/nitric-dev/membrane/pkg/sdk"
"github.com/nitric-dev/membrane/pkg/plugins/document"
"github.com/nitric-dev/membrane/pkg/plugins/events"
"github.com/nitric-dev/membrane/pkg/plugins/gateway"
"github.com/nitric-dev/membrane/pkg/plugins/queue"
"github.com/nitric-dev/membrane/pkg/plugins/storage"
"github.com/nitric-dev/membrane/pkg/providers"
"github.com/nitric-dev/membrane/pkg/utils"
)

Expand Down Expand Up @@ -53,12 +58,12 @@ func main() {
log.Println(fmt.Sprintf("failed to parse TOLERATE_MISSING_SERVICES environment variable with value [%s], defaulting to false", tolerateMissingServices))
tolerateMissing = false
}
var serviceFactory sdk.ServiceFactory = nil
var serviceFactory providers.ServiceFactory = nil

// Load the Plugin Factory
if plug, err := plugin.Open(fmt.Sprintf("%s/%s", pluginDir, serviceFactoryPluginFile)); err == nil {
if symbol, err := plug.Lookup("New"); err == nil {
if newFunc, ok := symbol.(func() (sdk.ServiceFactory, error)); ok {
if newFunc, ok := symbol.(func() (providers.ServiceFactory, error)); ok {
if serviceFactoryPlugin, err := newFunc(); err == nil {
serviceFactory = serviceFactoryPlugin
}
Expand All @@ -70,18 +75,18 @@ func main() {
}

// Load the concrete service implementations
var documentService sdk.DocumentService = nil
var eventingService sdk.EventService = nil
var gatewayService sdk.GatewayService = nil
var queueService sdk.QueueService = nil
var storageService sdk.StorageService = nil
var documentService document.DocumentService = nil
var eventService events.EventService = nil
var gatewayService gateway.GatewayService = nil
var queueService queue.QueueService = nil
var storageService storage.StorageService = nil

// Load the document service
if documentService, err = serviceFactory.NewDocumentService(); err != nil {
log.Fatal(err)
}
// Load the eventing service
if eventingService, err = serviceFactory.NewEventService(); err != nil {
if eventService, err = serviceFactory.NewEventService(); err != nil {
log.Fatal(err)
}
// Load the gateway service
Expand All @@ -103,7 +108,7 @@ func main() {
ChildAddress: childAddress,
ChildCommand: childCommand,
DocumentPlugin: documentService,
EventingPlugin: eventingService,
EventsPlugin: eventService,
StoragePlugin: storageService,
GatewayPlugin: gatewayService,
QueuePlugin: queueService,
Expand Down
23 changes: 21 additions & 2 deletions tests/plugins/document/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,9 +455,28 @@ func DeleteTests(docPlugin document.DocumentService) {
It("Should delete all children", func() {
LoadCustomersData(docPlugin)

err := docPlugin.Delete(&Customer1.Key)
col := document.Collection{
Name: "orders",
Parent: &document.Key{
Collection: &document.Collection{
Name: "customers",
},
},
}

result, err := docPlugin.Query(&col, []document.QueryExpression{}, 0, nil)
Expect(err).To(BeNil())
Expect(result.Documents).To(HaveLen(5))

err = docPlugin.Delete(&Customer1.Key)
Expect(err).ShouldNot(HaveOccurred())

err = docPlugin.Delete(&Customer2.Key)
Expect(err).ShouldNot(HaveOccurred())
// TODO: ensure Customer1.Orders are deleted

result, err = docPlugin.Query(&col, []document.QueryExpression{}, 0, nil)
Expect(err).To(BeNil())
Expect(result.Documents).To(HaveLen(0))
})
})
})
Expand Down

0 comments on commit 11ac213

Please sign in to comment.