Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Support DeleteDelayed operation in AutoRefresh cache #98

Merged
merged 1 commit into from
Sep 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion cache/auto_refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ type AutoRefresh interface {

// Get object if exists else create it.
GetOrCreate(id ItemID, item Item) (Item, error)

// DeleteDelayed queues an item for deletion. It Will get deleted as part of the next Sync cycle. Until the next sync
// cycle runs, Get and GetOrCreate will continue to return the Item in its previous state.
DeleteDelayed(id ItemID) error
}

type metrics struct {
Expand Down Expand Up @@ -113,6 +117,7 @@ type autoRefresh struct {
syncCb SyncFunc
createBatchesCb CreateBatchesFunc
lruMap *lru.Cache
toDelete *syncSet
syncPeriod time.Duration
workqueue workqueue.RateLimitingInterface
parallelizm int
Expand Down Expand Up @@ -190,6 +195,13 @@ func (w *autoRefresh) GetOrCreate(id ItemID, item Item) (Item, error) {
return item, nil
}

// DeleteDelayed queues an item for deletion. It Will get deleted as part of the next Sync cycle. Until the next sync
// cycle runs, Get and GetOrCreate will continue to return the Item in its previous state.
func (w *autoRefresh) DeleteDelayed(id ItemID) error {
w.toDelete.Insert(id)
return nil
}

// This function is called internally by its own timer. Roughly, it will,
// - List keys
// - Create batches of keys based on createBatchesCb
Expand All @@ -202,7 +214,7 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error {
for _, k := range keys {
// If not ok, it means evicted between the item was evicted between getting the keys and this update loop
// which is fine, we can just ignore.
if value, ok := w.lruMap.Peek(k); ok {
if value, ok := w.lruMap.Peek(k); ok && !w.toDelete.Contains(k) {
snapshot = append(snapshot, itemWrapper{
id: k.(ItemID),
item: value.(Item),
Expand Down Expand Up @@ -282,6 +294,12 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) {
}
}

w.toDelete.Range(func(key interface{}) bool {
w.lruMap.Remove(key)
w.toDelete.Remove(key)
return true
})

t.Stop()
}
}
Expand All @@ -304,6 +322,7 @@ func NewAutoRefreshBatchedCache(name string, createBatches CreateBatchesFunc, sy
createBatchesCb: createBatches,
syncCb: syncCb,
lruMap: lruCache,
toDelete: newSyncSet(),
syncPeriod: resyncPeriod,
workqueue: workqueue.NewNamedRateLimitingQueue(syncRateLimiter, scope.CurrentScope()),
}
Expand Down
40 changes: 40 additions & 0 deletions cache/sync_set.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package cache

import "sync"

var emptyVal = struct{}{}

// syncSet is a thread-safe Set.
type syncSet struct {
underlying sync.Map
}

// Contains checks if the key is present in the set.
func (s *syncSet) Contains(key interface{}) bool {
_, found := s.underlying.Load(key)
return found
}

// Insert adds a new key to the set if it doesn't already exist.
func (s *syncSet) Insert(key interface{}) {
s.underlying.Store(key, emptyVal)
}

// Remove deletes a key from the set.
func (s *syncSet) Remove(key interface{}) {
s.underlying.Delete(key)
}

// Range allows iterating over the set. Deleting the key while iterating is a supported operation.
func (s *syncSet) Range(callback func(key interface{}) bool) {
s.underlying.Range(func(key, value interface{}) bool {
return callback(key)
})
}

// newSyncSet initializes a new thread-safe set.
func newSyncSet() *syncSet {
return &syncSet{
underlying: sync.Map{},
}
}
49 changes: 49 additions & 0 deletions cache/sync_set_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package cache

import (
"testing"

"github.com/stretchr/testify/assert"
)

func rangeAndRemove(tb testing.TB, s *syncSet, count int) {
for i := 0; i < count; i++ {
s.Insert(i)
}

s.Range(func(key interface{}) bool {
s.Remove(key)
return true
})

for i := 0; i < count; i++ {
assert.False(tb, s.Contains(i))
}
}

func TestSyncSet_Range(t *testing.T) {
s := newSyncSet()
rangeAndRemove(t, s, 1000)
}

func BenchmarkSyncSet_Range(b *testing.B) {
s := newSyncSet()
rangeAndRemove(b, s, b.N)
}

func TestSyncSet_Contains(t *testing.T) {
s := newSyncSet()
count := 1000
for i := 0; i < count; i++ {
s.Insert(i)
}

for i := 0; i < count; i++ {
assert.True(t, s.Contains(i))
s.Remove(i)
}

for i := 0; i < count; i++ {
assert.False(t, s.Contains(i))
}
}