From d5330a184fc6a801c1dae0b3b9aef7568969cdc4 Mon Sep 17 00:00:00 2001 From: Brian Kassouf Date: Mon, 20 Apr 2020 14:56:49 -0700 Subject: [PATCH 1/6] identity: Add batch entity deletion endpoint --- helper/storagepacker/storagepacker.go | 140 +++++++----------- helper/storagepacker/storagepacker_test.go | 55 ++++++- vault/identity_store_entities.go | 107 ++++++++++++- vault/identity_store_entities_test.go | 52 +++++++ .../pages/api-docs/secret/identity/entity.mdx | 38 +++++ 5 files changed, 299 insertions(+), 93 deletions(-) diff --git a/helper/storagepacker/storagepacker.go b/helper/storagepacker/storagepacker.go index 5bb7a72571ce..e26adf8c82d8 100644 --- a/helper/storagepacker/storagepacker.go +++ b/helper/storagepacker/storagepacker.go @@ -125,112 +125,82 @@ func (s *StoragePacker) BucketKey(itemID string) string { // DeleteItem removes the item from the respective bucket func (s *StoragePacker) DeleteItem(_ context.Context, itemID string) error { - return s.DeleteMultipleItems(context.Background(), nil, itemID) + return s.DeleteMultipleItems(context.Background(), nil, []string{itemID}) } -func (s *StoragePacker) DeleteMultipleItems(ctx context.Context, logger hclog.Logger, itemIDs ...string) error { - var err error - switch len(itemIDs) { - case 0: - // Nothing +func (s *StoragePacker) DeleteMultipleItems(ctx context.Context, logger hclog.Logger, itemIDs []string) error { + if len(itemIDs) == 0 { return nil + } - case 1: + if logger == nil { logger = hclog.NewNullLogger() - fallthrough - - default: - lockIndexes := make(map[string]struct{}, len(s.storageLocks)) - for _, itemID := range itemIDs { - bucketKey := s.BucketKey(itemID) - if _, ok := lockIndexes[bucketKey]; !ok { - lockIndexes[bucketKey] = struct{}{} - } - } + } - lockKeys := make([]string, 0, len(lockIndexes)) - for k := range lockIndexes { - lockKeys = append(lockKeys, k) + // Sort the ids by the bucket the will be deleted from + lockKeys := make([]string, 0) + byBucket := make(map[string]map[string]struct{}) + for _, id := range itemIDs { + bucketKey := s.BucketKey(id) + bucket, ok := byBucket[bucketKey] + if !ok { + bucket = make(map[string]struct{}) } - locks := locksutil.LocksForKeys(s.storageLocks, lockKeys) - for _, lock := range locks { - lock.Lock() - defer lock.Unlock() - } + bucket[id] = struct{}{} + byBucket[bucketKey] = bucket + lockKeys = append(lockKeys, bucketKey) } - if logger == nil { - logger = hclog.NewNullLogger() + locks := locksutil.LocksForKeys(s.storageLocks, lockKeys) + for _, lock := range locks { + lock.Lock() + defer lock.Unlock() } - bucketCache := make(map[string]*Bucket, len(s.storageLocks)) - logger.Debug("deleting multiple items from storagepacker; caching and deleting from buckets", "total_items", len(itemIDs)) - var pctDone int - for idx, itemID := range itemIDs { - bucketKey := s.BucketKey(itemID) - - bucket, bucketFound := bucketCache[bucketKey] - if !bucketFound { - // Read from storage - storageEntry, err := s.view.Get(context.Background(), bucketKey) - if err != nil { - return errwrap.Wrapf("failed to read packed storage value: {{err}}", err) - } - if storageEntry == nil { - return nil - } - - uncompressedData, notCompressed, err := compressutil.Decompress(storageEntry.Value) - if err != nil { - return errwrap.Wrapf("failed to decompress packed storage value: {{err}}", err) - } - if notCompressed { - uncompressedData = storageEntry.Value - } - - bucket = new(Bucket) - err = proto.Unmarshal(uncompressedData, bucket) - if err != nil { - return errwrap.Wrapf("failed decoding packed storage entry: {{err}}", err) - } + // For each bucket, load from storage, remove the necessary items, and add + // write it back out to storage + pctDone := 0 + idx := 0 + for bucketKey, itemsToRemove := range byBucket { + // Read bucket from storage + storageEntry, err := s.view.Get(context.Background(), bucketKey) + if err != nil { + return errwrap.Wrapf("failed to read packed storage value: {{err}}", err) } - - // Look for a matching storage entry - foundIdx := -1 - for itemIdx, item := range bucket.Items { - if item.ID == itemID { - foundIdx = itemIdx - break - } + if storageEntry == nil { + logger.Warn("could not find bucket", "bucket", bucketKey) + continue } - // If there is a match, remove it from the collection and persist the - // resulting collection - if foundIdx != -1 { - bucket.Items[foundIdx] = bucket.Items[len(bucket.Items)-1] - bucket.Items = bucket.Items[:len(bucket.Items)-1] - if !bucketFound { - bucketCache[bucketKey] = bucket - } + uncompressedData, notCompressed, err := compressutil.Decompress(storageEntry.Value) + if err != nil { + return errwrap.Wrapf("failed to decompress packed storage value: {{err}}", err) + } + if notCompressed { + uncompressedData = storageEntry.Value } - newPctDone := idx * 100.0 / len(itemIDs) - if int(newPctDone) > pctDone { - pctDone = int(newPctDone) - logger.Trace("bucket item removal progress", "percent", pctDone, "items_removed", idx) + bucket := new(Bucket) + err = proto.Unmarshal(uncompressedData, bucket) + if err != nil { + return errwrap.Wrapf("failed decoding packed storage entry: {{err}}", err) } - } - logger.Debug("persisting buckets", "total_buckets", len(bucketCache)) + // Look for a matching storage entries and delete them from the list. + for i := 0; i < len(bucket.Items); i++ { + if _, ok := itemsToRemove[bucket.Items[i].ID]; ok { + bucket.Items[i] = bucket.Items[len(bucket.Items)-1] + bucket.Items = bucket.Items[:len(bucket.Items)-1] + + // Since we just moved a value to position i we need to + // decrement i so we replay this position + i-- + } + } - // Persist all buckets in the cache; these will be the ones that had - // deletions - pctDone = 0 - idx := 0 - for _, bucket := range bucketCache { // Fail if the context is canceled, the storage calls will fail anyways if ctx.Err() != nil { return ctx.Err() @@ -241,7 +211,7 @@ func (s *StoragePacker) DeleteMultipleItems(ctx context.Context, logger hclog.Lo return err } - newPctDone := idx * 100.0 / len(bucketCache) + newPctDone := idx * 100.0 / len(byBucket) if int(newPctDone) > pctDone { pctDone = int(newPctDone) logger.Trace("bucket persistence progress", "percent", pctDone, "buckets_persisted", idx) diff --git a/helper/storagepacker/storagepacker_test.go b/helper/storagepacker/storagepacker_test.go index ef7c1453a979..cc2448b2bc90 100644 --- a/helper/storagepacker/storagepacker_test.go +++ b/helper/storagepacker/storagepacker_test.go @@ -217,7 +217,7 @@ func TestStoragePacker_DeleteMultiple(t *testing.T) { itemsToDelete = append(itemsToDelete, fmt.Sprintf("item%d", i)) } - err = storagePacker.DeleteMultipleItems(ctx, nil, itemsToDelete...) + err = storagePacker.DeleteMultipleItems(ctx, nil, itemsToDelete) if err != nil { t.Fatal(err) } @@ -237,3 +237,56 @@ func TestStoragePacker_DeleteMultiple(t *testing.T) { } } } + +func TestStoragePacker_DeleteMultiple_ALL(t *testing.T) { + storagePacker, err := NewStoragePacker(&logical.InmemStorage{}, log.New(&log.LoggerOptions{Name: "storagepackertest"}), "") + if err != nil { + t.Fatal(err) + } + + ctx := context.Background() + + // Persist a storage entry + itemsToDelete := make([]string, 0, 10000) + for i := 0; i < 10000; i++ { + item := &Item{ + ID: fmt.Sprintf("item%d", i), + } + + err = storagePacker.PutItem(ctx, item) + if err != nil { + t.Fatal(err) + } + + // Verify that it can be read + fetchedItem, err := storagePacker.GetItem(item.ID) + if err != nil { + t.Fatal(err) + } + if fetchedItem == nil { + t.Fatalf("failed to read the stored item") + } + + if item.ID != fetchedItem.ID { + t.Fatalf("bad: item ID; expected: %q\n actual: %q\n", item.ID, fetchedItem.ID) + } + + itemsToDelete = append(itemsToDelete, fmt.Sprintf("item%d", i)) + } + + err = storagePacker.DeleteMultipleItems(ctx, nil, itemsToDelete) + if err != nil { + t.Fatal(err) + } + + // Check that the deletion was successful + for _, item := range itemsToDelete { + fetchedItem, err := storagePacker.GetItem(item) + if err != nil { + t.Fatal(err) + } + if fetchedItem != nil { + t.Fatal("item not deleted") + } + } +} diff --git a/vault/identity_store_entities.go b/vault/identity_store_entities.go index 388b52fc446c..d1495790bda3 100644 --- a/vault/identity_store_entities.go +++ b/vault/identity_store_entities.go @@ -89,6 +89,21 @@ func entityPaths(i *IdentityStore) []*framework.Path { HelpSynopsis: strings.TrimSpace(entityHelp["entity-id"][0]), HelpDescription: strings.TrimSpace(entityHelp["entity-id"][1]), }, + { + Pattern: "entity/batch-delete", + Fields: map[string]*framework.FieldSchema{ + "entity_ids": { + Type: framework.TypeCommaStringSlice, + Description: "Entity IDs which needs to get merged", + }, + }, + Callbacks: map[logical.Operation]framework.OperationFunc{ + logical.UpdateOperation: i.handleEntityBatchDelete(), + }, + + HelpSynopsis: strings.TrimSpace(entityHelp["batch-delete"][0]), + HelpDescription: strings.TrimSpace(entityHelp["batch-delete"][1]), + }, { Pattern: "entity/name/?$", Callbacks: map[logical.Operation]framework.OperationFunc{ @@ -420,7 +435,7 @@ func (i *IdentityStore) pathEntityIDDelete() framework.OperationFunc { return nil, nil } - err = i.handleEntityDeleteCommon(ctx, txn, entity) + err = i.handleEntityDeleteCommon(ctx, txn, entity, true) if err != nil { return nil, err } @@ -464,7 +479,7 @@ func (i *IdentityStore) pathEntityNameDelete() framework.OperationFunc { return nil, nil } - err = i.handleEntityDeleteCommon(ctx, txn, entity) + err = i.handleEntityDeleteCommon(ctx, txn, entity, true) if err != nil { return nil, err } @@ -475,7 +490,79 @@ func (i *IdentityStore) pathEntityNameDelete() framework.OperationFunc { } } -func (i *IdentityStore) handleEntityDeleteCommon(ctx context.Context, txn *memdb.Txn, entity *identity.Entity) error { +// pathEntityIDDelete deletes the entity for a given entity ID +func (i *IdentityStore) handleEntityBatchDelete() framework.OperationFunc { + return func(ctx context.Context, req *logical.Request, d *framework.FieldData) (*logical.Response, error) { + entityIDs := d.Get("entity_ids").([]string) + if len(entityIDs) == 0 { + return logical.ErrorResponse("missing entity ids to merge from"), nil + } + + // Sort the ids by the bucket they will be deleted from + byBucket := make(map[string]map[string]struct{}) + for _, id := range entityIDs { + bucket, ok := byBucket[i.entityPacker.BucketKey(id)] + if !ok { + bucket = make(map[string]struct{}) + } + + bucket[id] = struct{}{} + byBucket[i.entityPacker.BucketKey(id)] = bucket + } + + deleteIdsForBucket := func(entityIDs []string) error { + i.lock.Lock() + defer i.lock.Unlock() + + // Create a MemDB transaction to delete entities + txn := i.db.Txn(true) + defer txn.Abort() + + for _, entityID := range entityIDs { + // Fetch the entity using its ID + entity, err := i.MemDBEntityByIDInTxn(txn, entityID, true) + if err != nil { + return err + } + if entity == nil { + continue + } + + err = i.handleEntityDeleteCommon(ctx, txn, entity, false) + if err != nil { + return err + } + } + + // Write all updates for this bucket. + err := i.entityPacker.DeleteMultipleItems(ctx, i.logger, entityIDs) + if err != nil { + return err + } + + txn.Commit() + return nil + } + + for _, bucket := range byBucket { + ids := make([]string, len(bucket)) + i := 0 + for id, _ := range bucket { + ids[i] = id + i++ + } + + err := deleteIdsForBucket(ids) + if err != nil { + return nil, err + } + } + + return nil, nil + } +} + +func (i *IdentityStore) handleEntityDeleteCommon(ctx context.Context, txn *memdb.Txn, entity *identity.Entity, update bool) error { ns, err := namespace.FromContext(ctx) if err != nil { return err @@ -511,10 +598,12 @@ func (i *IdentityStore) handleEntityDeleteCommon(ctx context.Context, txn *memdb return err } - // Delete the entity from storage - err = i.entityPacker.DeleteItem(ctx, entity.ID) - if err != nil { - return err + if update { + // Delete the entity from storage + err = i.entityPacker.DeleteItem(ctx, entity.ID) + if err != nil { + return err + } } return nil @@ -772,4 +861,8 @@ var entityHelp = map[string][2]string{ "Merge two or more entities together", "", }, + "batch-delete": { + "Delete all of the entities provided", + "", + }, } diff --git a/vault/identity_store_entities_test.go b/vault/identity_store_entities_test.go index 931164f592a9..6e4128d671ef 100644 --- a/vault/identity_store_entities_test.go +++ b/vault/identity_store_entities_test.go @@ -407,6 +407,58 @@ func TestIdentityStore_EntityCreateUpdate(t *testing.T) { } } +func TestIdentityStore_BatchDelete(t *testing.T) { + ctx := namespace.RootContext(nil) + is, _, _ := testIdentityStoreWithGithubAuth(ctx, t) + + ids := make([]string, 10000) + for i := 0; i < 10000; i++ { + entityData := map[string]interface{}{ + "name": fmt.Sprintf("entity-%d", i), + } + + entityReq := &logical.Request{ + Operation: logical.UpdateOperation, + Path: "entity", + Data: entityData, + } + + // Create the entity + resp, err := is.HandleRequest(ctx, entityReq) + if err != nil || (resp != nil && resp.IsError()) { + t.Fatalf("err:%v resp:%#v", err, resp) + } + ids[i] = resp.Data["id"].(string) + } + + deleteReq := &logical.Request{ + Operation: logical.UpdateOperation, + Path: "entity/batch-delete", + Data: map[string]interface{}{ + "entity_ids": ids, + }, + } + + resp, err := is.HandleRequest(ctx, deleteReq) + if err != nil || (resp != nil && resp.IsError()) { + t.Fatalf("err:%v resp:%#v", err, resp) + } + + for _, entityID := range ids { + // Read the entity + resp, err := is.HandleRequest(ctx, &logical.Request{ + Operation: logical.ReadOperation, + Path: "entity/id/" + entityID, + }) + if err != nil || (resp != nil && resp.IsError()) { + t.Fatalf("err:%v resp:%#v", err, resp) + } + if resp != nil { + t.Fatal(resp) + } + } +} + func TestIdentityStore_CloneImmutability(t *testing.T) { alias := &identity.Alias{ ID: "testaliasid", diff --git a/website/pages/api-docs/secret/identity/entity.mdx b/website/pages/api-docs/secret/identity/entity.mdx index 1d90e20ca645..e3452e850b04 100644 --- a/website/pages/api-docs/secret/identity/entity.mdx +++ b/website/pages/api-docs/secret/identity/entity.mdx @@ -174,6 +174,44 @@ $ curl \ http://127.0.0.1:8200/v1/identity/entity/id/8d6a45e5-572f-8f13-d226-cd0d1ec57297 ``` +## Batch Delete Entities + +This endpoint deletes all entities provided. + +| Method | Path | +| :------- | :------------------------ | +| `POST` | `/identity/entity/batch-delete` | + +### Parameters + +- `entity_ids` `([]string: )` – List of entity identifiers. + +### Sample Payload + +```json +{ + "entity_ids": [ + "02fe5a88-912b-6794-62ed-db873ef86a95", + "3bf81bc9-44df-8138-57f9-724a9ae36d04", + "627fba68-98c9-c012-71ba-bfb349585ce1", + "6c4c805b-b384-3d0e-4d51-44d349887b96", + "70a72feb-35d1-c775-0813-8efaa8b4b9b5", + "f1092a67-ce34-48fd-161d-c13a367bc1cd", + "faedd89a-0d82-c197-c8f9-93a3e6cf0cd0" + ] +} +``` + +### Sample Request + +```shell +$ curl \ + --header "X-Vault-Token: ..." \ + --request POST \ + --data @payload.json \ + http://127.0.0.1:8200/v1/identity/entity/batch-delete +``` + ## List Entities by ID This endpoint returns a list of available entities by their identifiers. From f1927352d3a0de7ba4e72b0a92aad42c3d9cfd49 Mon Sep 17 00:00:00 2001 From: Brian Kassouf Date: Mon, 20 Apr 2020 14:58:31 -0700 Subject: [PATCH 2/6] Update the parameter description --- vault/identity_store_entities.go | 2 +- website/pages/api-docs/secret/identity/entity.mdx | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/vault/identity_store_entities.go b/vault/identity_store_entities.go index d1495790bda3..a9d6cf96159e 100644 --- a/vault/identity_store_entities.go +++ b/vault/identity_store_entities.go @@ -94,7 +94,7 @@ func entityPaths(i *IdentityStore) []*framework.Path { Fields: map[string]*framework.FieldSchema{ "entity_ids": { Type: framework.TypeCommaStringSlice, - Description: "Entity IDs which needs to get merged", + Description: "Entity IDs to delete", }, }, Callbacks: map[logical.Operation]framework.OperationFunc{ diff --git a/website/pages/api-docs/secret/identity/entity.mdx b/website/pages/api-docs/secret/identity/entity.mdx index e3452e850b04..4511ef4003dc 100644 --- a/website/pages/api-docs/secret/identity/entity.mdx +++ b/website/pages/api-docs/secret/identity/entity.mdx @@ -184,7 +184,7 @@ This endpoint deletes all entities provided. ### Parameters -- `entity_ids` `([]string: )` – List of entity identifiers. +- `entity_ids` `([]string: )` – List of entity identifiers to delete. ### Sample Payload From f8de8be41cf8ded71ed7104eefb131a914bc7689 Mon Sep 17 00:00:00 2001 From: Brian Kassouf Date: Mon, 20 Apr 2020 15:00:22 -0700 Subject: [PATCH 3/6] Update error message --- vault/identity_store_entities.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vault/identity_store_entities.go b/vault/identity_store_entities.go index a9d6cf96159e..966523a81877 100644 --- a/vault/identity_store_entities.go +++ b/vault/identity_store_entities.go @@ -495,7 +495,7 @@ func (i *IdentityStore) handleEntityBatchDelete() framework.OperationFunc { return func(ctx context.Context, req *logical.Request, d *framework.FieldData) (*logical.Response, error) { entityIDs := d.Get("entity_ids").([]string) if len(entityIDs) == 0 { - return logical.ErrorResponse("missing entity ids to merge from"), nil + return logical.ErrorResponse("missing entity ids to delete"), nil } // Sort the ids by the bucket they will be deleted from From a7b1175c0d9a7322f885f03689721c580d2b9994 Mon Sep 17 00:00:00 2001 From: Brian Kassouf Date: Tue, 21 Apr 2020 13:31:26 -0700 Subject: [PATCH 4/6] Update helper/storagepacker/storagepacker.go Co-Authored-By: Vishal Nayak --- helper/storagepacker/storagepacker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/helper/storagepacker/storagepacker.go b/helper/storagepacker/storagepacker.go index e26adf8c82d8..c6f091550c9a 100644 --- a/helper/storagepacker/storagepacker.go +++ b/helper/storagepacker/storagepacker.go @@ -137,7 +137,7 @@ func (s *StoragePacker) DeleteMultipleItems(ctx context.Context, logger hclog.Lo logger = hclog.NewNullLogger() } - // Sort the ids by the bucket the will be deleted from + // Sort the ids by the bucket they will be deleted from lockKeys := make([]string, 0) byBucket := make(map[string]map[string]struct{}) for _, id := range itemIDs { From d9ecd0f70de97490d8ca780efacd347dce7bd109 Mon Sep 17 00:00:00 2001 From: Brian Kassouf Date: Tue, 21 Apr 2020 14:45:13 -0700 Subject: [PATCH 5/6] Review feedback --- helper/storagepacker/storagepacker.go | 6 ++++-- vault/identity_store_entities.go | 6 ++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/helper/storagepacker/storagepacker.go b/helper/storagepacker/storagepacker.go index c6f091550c9a..91ec08da9705 100644 --- a/helper/storagepacker/storagepacker.go +++ b/helper/storagepacker/storagepacker.go @@ -145,11 +145,13 @@ func (s *StoragePacker) DeleteMultipleItems(ctx context.Context, logger hclog.Lo bucket, ok := byBucket[bucketKey] if !ok { bucket = make(map[string]struct{}) + byBucket[bucketKey] = bucket + + // Add the lock key once + lockKeys = append(lockKeys, bucketKey) } bucket[id] = struct{}{} - byBucket[bucketKey] = bucket - lockKeys = append(lockKeys, bucketKey) } locks := locksutil.LocksForKeys(s.storageLocks, lockKeys) diff --git a/vault/identity_store_entities.go b/vault/identity_store_entities.go index 966523a81877..b4045ccc6fa0 100644 --- a/vault/identity_store_entities.go +++ b/vault/identity_store_entities.go @@ -501,13 +501,15 @@ func (i *IdentityStore) handleEntityBatchDelete() framework.OperationFunc { // Sort the ids by the bucket they will be deleted from byBucket := make(map[string]map[string]struct{}) for _, id := range entityIDs { - bucket, ok := byBucket[i.entityPacker.BucketKey(id)] + bucketKey := i.entityPacker.BucketKey(id) + + bucket, ok := byBucket[bucketKey] if !ok { bucket = make(map[string]struct{}) + byBucket[bucketKey] = bucket } bucket[id] = struct{}{} - byBucket[i.entityPacker.BucketKey(id)] = bucket } deleteIdsForBucket := func(entityIDs []string) error { From a8800e1504569512f9badf7fbba46dc9243ccc24 Mon Sep 17 00:00:00 2001 From: Brian Kassouf Date: Thu, 23 Apr 2020 15:20:46 -0700 Subject: [PATCH 6/6] Update vault/identity_store_entities.go Co-Authored-By: Calvin Leung Huang --- vault/identity_store_entities.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/vault/identity_store_entities.go b/vault/identity_store_entities.go index b4045ccc6fa0..4f23896158b9 100644 --- a/vault/identity_store_entities.go +++ b/vault/identity_store_entities.go @@ -516,7 +516,9 @@ func (i *IdentityStore) handleEntityBatchDelete() framework.OperationFunc { i.lock.Lock() defer i.lock.Unlock() - // Create a MemDB transaction to delete entities + // Create a MemDB transaction to delete entities from the inmem database + // without altering storage. Batch deletion on storage bucket items is + // performed directly through entityPacker. txn := i.db.Txn(true) defer txn.Abort()