From e3b48f1cd432618d8e8989a112de6917d3ebb917 Mon Sep 17 00:00:00 2001 From: Haytham Abuelfutuh Date: Tue, 21 Sep 2021 16:06:34 -0700 Subject: [PATCH] Support DeleteDelayed operation in AutoRefresh cache Signed-off-by: Haytham Abuelfutuh --- cache/auto_refresh.go | 21 +++++++++++++++++- cache/sync_set.go | 40 ++++++++++++++++++++++++++++++++++ cache/sync_set_test.go | 49 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 109 insertions(+), 1 deletion(-) create mode 100644 cache/sync_set.go create mode 100644 cache/sync_set_test.go diff --git a/cache/auto_refresh.go b/cache/auto_refresh.go index 6360127d..09a85ae3 100644 --- a/cache/auto_refresh.go +++ b/cache/auto_refresh.go @@ -40,6 +40,10 @@ type AutoRefresh interface { // Get object if exists else create it. GetOrCreate(id ItemID, item Item) (Item, error) + + // DeleteDelayed queues an item for deletion. It Will get deleted as part of the next Sync cycle. Until the next sync + // cycle runs, Get and GetOrCreate will continue to return the Item in its previous state. + DeleteDelayed(id ItemID) error } type metrics struct { @@ -113,6 +117,7 @@ type autoRefresh struct { syncCb SyncFunc createBatchesCb CreateBatchesFunc lruMap *lru.Cache + toDelete *syncSet syncPeriod time.Duration workqueue workqueue.RateLimitingInterface parallelizm int @@ -190,6 +195,13 @@ func (w *autoRefresh) GetOrCreate(id ItemID, item Item) (Item, error) { return item, nil } +// DeleteDelayed queues an item for deletion. It Will get deleted as part of the next Sync cycle. Until the next sync +// cycle runs, Get and GetOrCreate will continue to return the Item in its previous state. +func (w *autoRefresh) DeleteDelayed(id ItemID) error { + w.toDelete.Insert(id) + return nil +} + // This function is called internally by its own timer. Roughly, it will, // - List keys // - Create batches of keys based on createBatchesCb @@ -202,7 +214,7 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error { for _, k := range keys { // If not ok, it means evicted between the item was evicted between getting the keys and this update loop // which is fine, we can just ignore. - if value, ok := w.lruMap.Peek(k); ok { + if value, ok := w.lruMap.Peek(k); ok && !w.toDelete.Contains(k) { snapshot = append(snapshot, itemWrapper{ id: k.(ItemID), item: value.(Item), @@ -282,6 +294,12 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) { } } + w.toDelete.Range(func(key interface{}) bool { + w.lruMap.Remove(key) + w.toDelete.Remove(key) + return true + }) + t.Stop() } } @@ -304,6 +322,7 @@ func NewAutoRefreshBatchedCache(name string, createBatches CreateBatchesFunc, sy createBatchesCb: createBatches, syncCb: syncCb, lruMap: lruCache, + toDelete: newSyncSet(), syncPeriod: resyncPeriod, workqueue: workqueue.NewNamedRateLimitingQueue(syncRateLimiter, scope.CurrentScope()), } diff --git a/cache/sync_set.go b/cache/sync_set.go new file mode 100644 index 00000000..211b4f10 --- /dev/null +++ b/cache/sync_set.go @@ -0,0 +1,40 @@ +package cache + +import "sync" + +var emptyVal = struct{}{} + +// syncSet is a thread-safe Set. +type syncSet struct { + underlying sync.Map +} + +// Contains checks if the key is present in the set. +func (s *syncSet) Contains(key interface{}) bool { + _, found := s.underlying.Load(key) + return found +} + +// Insert adds a new key to the set if it doesn't already exist. +func (s *syncSet) Insert(key interface{}) { + s.underlying.Store(key, emptyVal) +} + +// Remove deletes a key from the set. +func (s *syncSet) Remove(key interface{}) { + s.underlying.Delete(key) +} + +// Range allows iterating over the set. Deleting the key while iterating is a supported operation. +func (s *syncSet) Range(callback func(key interface{}) bool) { + s.underlying.Range(func(key, value interface{}) bool { + return callback(key) + }) +} + +// newSyncSet initializes a new thread-safe set. +func newSyncSet() *syncSet { + return &syncSet{ + underlying: sync.Map{}, + } +} diff --git a/cache/sync_set_test.go b/cache/sync_set_test.go new file mode 100644 index 00000000..5a80138d --- /dev/null +++ b/cache/sync_set_test.go @@ -0,0 +1,49 @@ +package cache + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func rangeAndRemove(tb testing.TB, s *syncSet, count int) { + for i := 0; i < count; i++ { + s.Insert(i) + } + + s.Range(func(key interface{}) bool { + s.Remove(key) + return true + }) + + for i := 0; i < count; i++ { + assert.False(tb, s.Contains(i)) + } +} + +func TestSyncSet_Range(t *testing.T) { + s := newSyncSet() + rangeAndRemove(t, s, 1000) +} + +func BenchmarkSyncSet_Range(b *testing.B) { + s := newSyncSet() + rangeAndRemove(b, s, b.N) +} + +func TestSyncSet_Contains(t *testing.T) { + s := newSyncSet() + count := 1000 + for i := 0; i < count; i++ { + s.Insert(i) + } + + for i := 0; i < count; i++ { + assert.True(t, s.Contains(i)) + s.Remove(i) + } + + for i := 0; i < count; i++ { + assert.False(t, s.Contains(i)) + } +}