Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport 1.4.1: identity: Add batch entity deletion endpoint (#8785) #8830

Merged
merged 1 commit into from
Apr 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 57 additions & 85 deletions helper/storagepacker/storagepacker.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,113 +127,85 @@ func (s *StoragePacker) BucketKey(itemID string) string {

// DeleteItem removes the item from the respective bucket
func (s *StoragePacker) DeleteItem(_ context.Context, itemID string) error {
return s.DeleteMultipleItems(context.Background(), nil, itemID)
return s.DeleteMultipleItems(context.Background(), nil, []string{itemID})
}

func (s *StoragePacker) DeleteMultipleItems(ctx context.Context, logger hclog.Logger, itemIDs ...string) error {
func (s *StoragePacker) DeleteMultipleItems(ctx context.Context, logger hclog.Logger, itemIDs []string) error {
defer metrics.MeasureSince([]string{"storage_packer", "delete_items"}, time.Now())
var err error
switch len(itemIDs) {
case 0:
// Nothing
if len(itemIDs) == 0 {
return nil

case 1:
logger = hclog.NewNullLogger()
fallthrough

default:
lockIndexes := make(map[string]struct{}, len(s.storageLocks))
for _, itemID := range itemIDs {
bucketKey := s.BucketKey(itemID)
if _, ok := lockIndexes[bucketKey]; !ok {
lockIndexes[bucketKey] = struct{}{}
}
}

lockKeys := make([]string, 0, len(lockIndexes))
for k := range lockIndexes {
lockKeys = append(lockKeys, k)
}

locks := locksutil.LocksForKeys(s.storageLocks, lockKeys)
for _, lock := range locks {
lock.Lock()
defer lock.Unlock()
}
}

if logger == nil {
logger = hclog.NewNullLogger()
}

bucketCache := make(map[string]*Bucket, len(s.storageLocks))
// Sort the ids by the bucket they will be deleted from
lockKeys := make([]string, 0)
byBucket := make(map[string]map[string]struct{})
for _, id := range itemIDs {
bucketKey := s.BucketKey(id)
bucket, ok := byBucket[bucketKey]
if !ok {
bucket = make(map[string]struct{})
byBucket[bucketKey] = bucket

logger.Debug("deleting multiple items from storagepacker; caching and deleting from buckets", "total_items", len(itemIDs))
// Add the lock key once
lockKeys = append(lockKeys, bucketKey)
}

var pctDone int
for idx, itemID := range itemIDs {
bucketKey := s.BucketKey(itemID)
bucket[id] = struct{}{}
}

bucket, bucketFound := bucketCache[bucketKey]
if !bucketFound {
// Read from storage
storageEntry, err := s.view.Get(context.Background(), bucketKey)
if err != nil {
return errwrap.Wrapf("failed to read packed storage value: {{err}}", err)
}
if storageEntry == nil {
return nil
}
locks := locksutil.LocksForKeys(s.storageLocks, lockKeys)
for _, lock := range locks {
lock.Lock()
defer lock.Unlock()
}

uncompressedData, notCompressed, err := compressutil.Decompress(storageEntry.Value)
if err != nil {
return errwrap.Wrapf("failed to decompress packed storage value: {{err}}", err)
}
if notCompressed {
uncompressedData = storageEntry.Value
}
logger.Debug("deleting multiple items from storagepacker; caching and deleting from buckets", "total_items", len(itemIDs))

bucket = new(Bucket)
err = proto.Unmarshal(uncompressedData, bucket)
if err != nil {
return errwrap.Wrapf("failed decoding packed storage entry: {{err}}", err)
}
// For each bucket, load from storage, remove the necessary items, and add
// write it back out to storage
pctDone := 0
idx := 0
for bucketKey, itemsToRemove := range byBucket {
// Read bucket from storage
storageEntry, err := s.view.Get(context.Background(), bucketKey)
if err != nil {
return errwrap.Wrapf("failed to read packed storage value: {{err}}", err)
}

// Look for a matching storage entry
foundIdx := -1
for itemIdx, item := range bucket.Items {
if item.ID == itemID {
foundIdx = itemIdx
break
}
if storageEntry == nil {
logger.Warn("could not find bucket", "bucket", bucketKey)
continue
}

// If there is a match, remove it from the collection and persist the
// resulting collection
if foundIdx != -1 {
bucket.Items[foundIdx] = bucket.Items[len(bucket.Items)-1]
bucket.Items = bucket.Items[:len(bucket.Items)-1]
if !bucketFound {
bucketCache[bucketKey] = bucket
}
uncompressedData, notCompressed, err := compressutil.Decompress(storageEntry.Value)
if err != nil {
return errwrap.Wrapf("failed to decompress packed storage value: {{err}}", err)
}
if notCompressed {
uncompressedData = storageEntry.Value
}

newPctDone := idx * 100.0 / len(itemIDs)
if int(newPctDone) > pctDone {
pctDone = int(newPctDone)
logger.Trace("bucket item removal progress", "percent", pctDone, "items_removed", idx)
bucket := new(Bucket)
err = proto.Unmarshal(uncompressedData, bucket)
if err != nil {
return errwrap.Wrapf("failed decoding packed storage entry: {{err}}", err)
}
}

logger.Debug("persisting buckets", "total_buckets", len(bucketCache))
// Look for a matching storage entries and delete them from the list.
for i := 0; i < len(bucket.Items); i++ {
if _, ok := itemsToRemove[bucket.Items[i].ID]; ok {
bucket.Items[i] = bucket.Items[len(bucket.Items)-1]
bucket.Items = bucket.Items[:len(bucket.Items)-1]

// Since we just moved a value to position i we need to
// decrement i so we replay this position
i--
}
}

// Persist all buckets in the cache; these will be the ones that had
// deletions
pctDone = 0
idx := 0
for _, bucket := range bucketCache {
// Fail if the context is canceled, the storage calls will fail anyways
if ctx.Err() != nil {
return ctx.Err()
Expand All @@ -244,7 +216,7 @@ func (s *StoragePacker) DeleteMultipleItems(ctx context.Context, logger hclog.Lo
return err
}

newPctDone := idx * 100.0 / len(bucketCache)
newPctDone := idx * 100.0 / len(byBucket)
if int(newPctDone) > pctDone {
pctDone = int(newPctDone)
logger.Trace("bucket persistence progress", "percent", pctDone, "buckets_persisted", idx)
Expand Down
55 changes: 54 additions & 1 deletion helper/storagepacker/storagepacker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func TestStoragePacker_DeleteMultiple(t *testing.T) {
itemsToDelete = append(itemsToDelete, fmt.Sprintf("item%d", i))
}

err = storagePacker.DeleteMultipleItems(ctx, nil, itemsToDelete...)
err = storagePacker.DeleteMultipleItems(ctx, nil, itemsToDelete)
if err != nil {
t.Fatal(err)
}
Expand All @@ -237,3 +237,56 @@ func TestStoragePacker_DeleteMultiple(t *testing.T) {
}
}
}

func TestStoragePacker_DeleteMultiple_ALL(t *testing.T) {
storagePacker, err := NewStoragePacker(&logical.InmemStorage{}, log.New(&log.LoggerOptions{Name: "storagepackertest"}), "")
if err != nil {
t.Fatal(err)
}

ctx := context.Background()

// Persist a storage entry
itemsToDelete := make([]string, 0, 10000)
for i := 0; i < 10000; i++ {
item := &Item{
ID: fmt.Sprintf("item%d", i),
}

err = storagePacker.PutItem(ctx, item)
if err != nil {
t.Fatal(err)
}

// Verify that it can be read
fetchedItem, err := storagePacker.GetItem(item.ID)
if err != nil {
t.Fatal(err)
}
if fetchedItem == nil {
t.Fatalf("failed to read the stored item")
}

if item.ID != fetchedItem.ID {
t.Fatalf("bad: item ID; expected: %q\n actual: %q\n", item.ID, fetchedItem.ID)
}

itemsToDelete = append(itemsToDelete, fmt.Sprintf("item%d", i))
}

err = storagePacker.DeleteMultipleItems(ctx, nil, itemsToDelete)
if err != nil {
t.Fatal(err)
}

// Check that the deletion was successful
for _, item := range itemsToDelete {
fetchedItem, err := storagePacker.GetItem(item)
if err != nil {
t.Fatal(err)
}
if fetchedItem != nil {
t.Fatal("item not deleted")
}
}
}
111 changes: 104 additions & 7 deletions vault/identity_store_entities.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,21 @@ func entityPaths(i *IdentityStore) []*framework.Path {
HelpSynopsis: strings.TrimSpace(entityHelp["entity-id"][0]),
HelpDescription: strings.TrimSpace(entityHelp["entity-id"][1]),
},
{
Pattern: "entity/batch-delete",
Fields: map[string]*framework.FieldSchema{
"entity_ids": {
Type: framework.TypeCommaStringSlice,
Description: "Entity IDs to delete",
},
},
Callbacks: map[logical.Operation]framework.OperationFunc{
logical.UpdateOperation: i.handleEntityBatchDelete(),
},

HelpSynopsis: strings.TrimSpace(entityHelp["batch-delete"][0]),
HelpDescription: strings.TrimSpace(entityHelp["batch-delete"][1]),
},
{
Pattern: "entity/name/?$",
Callbacks: map[logical.Operation]framework.OperationFunc{
Expand Down Expand Up @@ -420,7 +435,7 @@ func (i *IdentityStore) pathEntityIDDelete() framework.OperationFunc {
return nil, nil
}

err = i.handleEntityDeleteCommon(ctx, txn, entity)
err = i.handleEntityDeleteCommon(ctx, txn, entity, true)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -464,7 +479,7 @@ func (i *IdentityStore) pathEntityNameDelete() framework.OperationFunc {
return nil, nil
}

err = i.handleEntityDeleteCommon(ctx, txn, entity)
err = i.handleEntityDeleteCommon(ctx, txn, entity, true)
if err != nil {
return nil, err
}
Expand All @@ -475,7 +490,83 @@ func (i *IdentityStore) pathEntityNameDelete() framework.OperationFunc {
}
}

func (i *IdentityStore) handleEntityDeleteCommon(ctx context.Context, txn *memdb.Txn, entity *identity.Entity) error {
// pathEntityIDDelete deletes the entity for a given entity ID
func (i *IdentityStore) handleEntityBatchDelete() framework.OperationFunc {
return func(ctx context.Context, req *logical.Request, d *framework.FieldData) (*logical.Response, error) {
entityIDs := d.Get("entity_ids").([]string)
if len(entityIDs) == 0 {
return logical.ErrorResponse("missing entity ids to delete"), nil
}

// Sort the ids by the bucket they will be deleted from
byBucket := make(map[string]map[string]struct{})
for _, id := range entityIDs {
bucketKey := i.entityPacker.BucketKey(id)

bucket, ok := byBucket[bucketKey]
if !ok {
bucket = make(map[string]struct{})
byBucket[bucketKey] = bucket
}

bucket[id] = struct{}{}
}

deleteIdsForBucket := func(entityIDs []string) error {
i.lock.Lock()
defer i.lock.Unlock()

// Create a MemDB transaction to delete entities from the inmem database
// without altering storage. Batch deletion on storage bucket items is
// performed directly through entityPacker.
txn := i.db.Txn(true)
defer txn.Abort()

for _, entityID := range entityIDs {
// Fetch the entity using its ID
entity, err := i.MemDBEntityByIDInTxn(txn, entityID, true)
if err != nil {
return err
}
if entity == nil {
continue
}

err = i.handleEntityDeleteCommon(ctx, txn, entity, false)
if err != nil {
return err
}
}

// Write all updates for this bucket.
err := i.entityPacker.DeleteMultipleItems(ctx, i.logger, entityIDs)
if err != nil {
return err
}

txn.Commit()
return nil
}

for _, bucket := range byBucket {
ids := make([]string, len(bucket))
i := 0
for id, _ := range bucket {
ids[i] = id
i++
}

err := deleteIdsForBucket(ids)
if err != nil {
return nil, err
}
}

return nil, nil
}
}

func (i *IdentityStore) handleEntityDeleteCommon(ctx context.Context, txn *memdb.Txn, entity *identity.Entity, update bool) error {
ns, err := namespace.FromContext(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -511,10 +602,12 @@ func (i *IdentityStore) handleEntityDeleteCommon(ctx context.Context, txn *memdb
return err
}

// Delete the entity from storage
err = i.entityPacker.DeleteItem(ctx, entity.ID)
if err != nil {
return err
if update {
// Delete the entity from storage
err = i.entityPacker.DeleteItem(ctx, entity.ID)
if err != nil {
return err
}
}

return nil
Expand Down Expand Up @@ -772,4 +865,8 @@ var entityHelp = map[string][2]string{
"Merge two or more entities together",
"",
},
"batch-delete": {
"Delete all of the entities provided",
"",
},
}
Loading