Skip to content

Commit

Permalink
Support DeleteDelayed operation in AutoRefresh cache (flyteorg#98)
Browse files Browse the repository at this point in the history
Signed-off-by: Haytham Abuelfutuh <[email protected]>
  • Loading branch information
EngHabu authored Sep 21, 2021
1 parent 3191899 commit dcc4d89
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 1 deletion.
21 changes: 20 additions & 1 deletion cache/auto_refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ type AutoRefresh interface {

// Get object if exists else create it.
GetOrCreate(id ItemID, item Item) (Item, error)

// DeleteDelayed queues an item for deletion. It Will get deleted as part of the next Sync cycle. Until the next sync
// cycle runs, Get and GetOrCreate will continue to return the Item in its previous state.
DeleteDelayed(id ItemID) error
}

type metrics struct {
Expand Down Expand Up @@ -113,6 +117,7 @@ type autoRefresh struct {
syncCb SyncFunc
createBatchesCb CreateBatchesFunc
lruMap *lru.Cache
toDelete *syncSet
syncPeriod time.Duration
workqueue workqueue.RateLimitingInterface
parallelizm int
Expand Down Expand Up @@ -190,6 +195,13 @@ func (w *autoRefresh) GetOrCreate(id ItemID, item Item) (Item, error) {
return item, nil
}

// DeleteDelayed queues an item for deletion. It Will get deleted as part of the next Sync cycle. Until the next sync
// cycle runs, Get and GetOrCreate will continue to return the Item in its previous state.
func (w *autoRefresh) DeleteDelayed(id ItemID) error {
w.toDelete.Insert(id)
return nil
}

// This function is called internally by its own timer. Roughly, it will,
// - List keys
// - Create batches of keys based on createBatchesCb
Expand All @@ -202,7 +214,7 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error {
for _, k := range keys {
// If not ok, it means evicted between the item was evicted between getting the keys and this update loop
// which is fine, we can just ignore.
if value, ok := w.lruMap.Peek(k); ok {
if value, ok := w.lruMap.Peek(k); ok && !w.toDelete.Contains(k) {
snapshot = append(snapshot, itemWrapper{
id: k.(ItemID),
item: value.(Item),
Expand Down Expand Up @@ -282,6 +294,12 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) {
}
}

w.toDelete.Range(func(key interface{}) bool {
w.lruMap.Remove(key)
w.toDelete.Remove(key)
return true
})

t.Stop()
}
}
Expand All @@ -304,6 +322,7 @@ func NewAutoRefreshBatchedCache(name string, createBatches CreateBatchesFunc, sy
createBatchesCb: createBatches,
syncCb: syncCb,
lruMap: lruCache,
toDelete: newSyncSet(),
syncPeriod: resyncPeriod,
workqueue: workqueue.NewNamedRateLimitingQueue(syncRateLimiter, scope.CurrentScope()),
}
Expand Down
40 changes: 40 additions & 0 deletions cache/sync_set.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package cache

import "sync"

var emptyVal = struct{}{}

// syncSet is a thread-safe Set.
type syncSet struct {
underlying sync.Map
}

// Contains checks if the key is present in the set.
func (s *syncSet) Contains(key interface{}) bool {
_, found := s.underlying.Load(key)
return found
}

// Insert adds a new key to the set if it doesn't already exist.
func (s *syncSet) Insert(key interface{}) {
s.underlying.Store(key, emptyVal)
}

// Remove deletes a key from the set.
func (s *syncSet) Remove(key interface{}) {
s.underlying.Delete(key)
}

// Range allows iterating over the set. Deleting the key while iterating is a supported operation.
func (s *syncSet) Range(callback func(key interface{}) bool) {
s.underlying.Range(func(key, value interface{}) bool {
return callback(key)
})
}

// newSyncSet initializes a new thread-safe set.
func newSyncSet() *syncSet {
return &syncSet{
underlying: sync.Map{},
}
}
49 changes: 49 additions & 0 deletions cache/sync_set_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package cache

import (
"testing"

"github.com/stretchr/testify/assert"
)

func rangeAndRemove(tb testing.TB, s *syncSet, count int) {
for i := 0; i < count; i++ {
s.Insert(i)
}

s.Range(func(key interface{}) bool {
s.Remove(key)
return true
})

for i := 0; i < count; i++ {
assert.False(tb, s.Contains(i))
}
}

func TestSyncSet_Range(t *testing.T) {
s := newSyncSet()
rangeAndRemove(t, s, 1000)
}

func BenchmarkSyncSet_Range(b *testing.B) {
s := newSyncSet()
rangeAndRemove(b, s, b.N)
}

func TestSyncSet_Contains(t *testing.T) {
s := newSyncSet()
count := 1000
for i := 0; i < count; i++ {
s.Insert(i)
}

for i := 0; i < count; i++ {
assert.True(t, s.Contains(i))
s.Remove(i)
}

for i := 0; i < count; i++ {
assert.False(t, s.Contains(i))
}
}

0 comments on commit dcc4d89

Please sign in to comment.