Skip to content

Commit

Permalink
Add cache hit/miss to Autorefresh cache (flyteorg#43)
Browse files Browse the repository at this point in the history
* Add metrics and name for cache

* Mark autorefresh cache as deprecated

* Remove items from the workqueue after they're processed
  • Loading branch information
EngHabu authored Oct 17, 2019
1 parent 9a5bce7 commit 9d0448c
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 29 deletions.
81 changes: 55 additions & 26 deletions cache/auto_refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const (
// callbacks for create, refresh and delete item.
// The cache doesn't provide apis to update items.
type AutoRefresh interface {
// starts background refresh of items.
// Starts background refresh of items. To shutdown the cache, cancel the context.
Start(ctx context.Context) error

// Get item by id.
Expand All @@ -43,9 +43,12 @@ type AutoRefresh interface {
}

type metrics struct {
SystemErrors prometheus.Counter
Evictions prometheus.Counter
scope promutils.Scope
SyncErrors prometheus.Counter
Evictions prometheus.Counter
SyncLatency promutils.StopWatch
CacheHit prometheus.Counter
CacheMiss prometheus.Counter
scope promutils.Scope
}

type Item interface{}
Expand Down Expand Up @@ -104,6 +107,7 @@ func (i itemWrapper) GetItem() Item {
//
// Sync is run as a fixed-interval-scheduled-task, and is skipped if sync from previous cycle is still running.
type autoRefresh struct {
name string
metrics metrics
syncCb SyncFunc
createBatchesCb CreateBatchesFunc
Expand All @@ -130,9 +134,12 @@ func SingleItemBatches(_ context.Context, snapshot []ItemWrapper) (batches []Bat

func newMetrics(scope promutils.Scope) metrics {
return metrics{
Evictions: scope.MustNewCounter("lru_evictions", "Counter for evictions from LRU."),
SystemErrors: scope.MustNewCounter("sync_errors", "Counter for sync errors."),
scope: scope,
SyncErrors: scope.MustNewCounter("sync_errors", "Counter for sync errors."),
Evictions: scope.MustNewCounter("lru_evictions", "Counter for evictions from LRU."),
SyncLatency: scope.MustNewStopWatch("latency", "Latency for sync operations.", time.Millisecond),
CacheHit: scope.MustNewCounter("cache_hit", "Counter for cache hits."),
CacheMiss: scope.MustNewCounter("cache_miss", "Counter for cache misses."),
scope: scope,
}
}

Expand All @@ -143,35 +150,41 @@ func (w *autoRefresh) Start(ctx context.Context) error {
if err != nil {
logger.Errorf(ctx, "Failed to sync. Error: %v", err)
}
}(contextutils.WithGoroutineLabel(ctx, fmt.Sprintf("worker-%v", i)))
}(contextutils.WithGoroutineLabel(ctx, fmt.Sprintf("%v-worker-%v", w.name, i)))
}

enqueueCtx := contextutils.WithGoroutineLabel(ctx, fmt.Sprintf("%v-enqueue", w.name))

go wait.Until(func() {
err := w.enqueueBatches(ctx)
err := w.enqueueBatches(enqueueCtx)
if err != nil {
logger.Errorf(ctx, "Failed to sync. Error: %v", err)
logger.Errorf(enqueueCtx, "Failed to sync. Error: %v", err)
}
}, w.syncPeriod, ctx.Done())
}, w.syncPeriod, enqueueCtx.Done())

return nil
}

func (w *autoRefresh) Get(id ItemID) (Item, error) {
if val, ok := w.lruMap.Get(id); ok {
w.metrics.CacheHit.Inc()
return val.(Item), nil
}

w.metrics.CacheMiss.Inc()
return nil, errors.Errorf(ErrNotFound, "Item with id [%v] not found.", id)
}

// Return the item if exists else create it.
// Create should be invoked only once. recreating the object is not supported.
func (w *autoRefresh) GetOrCreate(id ItemID, item Item) (Item, error) {
if val, ok := w.lruMap.Get(id); ok {
w.metrics.CacheHit.Inc()
return val.(Item), nil
}

w.lruMap.Add(id, item)
w.metrics.CacheMiss.Inc()
return item, nil
}

Expand Down Expand Up @@ -218,28 +231,43 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error {
// * Sync loop updates item 2, repeat
func (w *autoRefresh) sync(ctx context.Context) error {
for {
item, shutdown := w.workqueue.Get()
if shutdown {
select {
case <-ctx.Done():
return nil
}
default:
item, shutdown := w.workqueue.Get()
if shutdown {
return nil
}

updatedBatch, err := w.syncCb(ctx, *item.(*Batch))
if err != nil {
logger.Error(ctx, "failed to get latest copy of a batch. Error: %v", err)
continue
}
t := w.metrics.SyncLatency.Start()
updatedBatch, err := w.syncCb(ctx, *item.(*Batch))

// Since we create batches every time we sync, we will just remove the item from the queue here
// regardless of whether it succeeded the sync or not.
w.workqueue.Forget(item)
w.workqueue.Done(item)

if err != nil {
w.metrics.SyncErrors.Inc()
logger.Errorf(ctx, "failed to get latest copy of a batch. Error: %v", err)
t.Stop()
continue
}

for _, item := range updatedBatch {
if item.Action == Update {
// Add adds the item if it has been evicted or updates an existing one.
w.lruMap.Add(item.ID, item.Item)
for _, item := range updatedBatch {
if item.Action == Update {
// Add adds the item if it has been evicted or updates an existing one.
w.lruMap.Add(item.ID, item.Item)
}
}
t.Stop()
}
}
}

// Instantiates a new AutoRefresh Cache that syncs items in batches.
func NewAutoRefreshBatchedCache(createBatches CreateBatchesFunc, syncCb SyncFunc, syncRateLimiter workqueue.RateLimiter,
func NewAutoRefreshBatchedCache(name string, createBatches CreateBatchesFunc, syncCb SyncFunc, syncRateLimiter workqueue.RateLimiter,
resyncPeriod time.Duration, parallelizm, size int, scope promutils.Scope) (AutoRefresh, error) {

metrics := newMetrics(scope)
Expand All @@ -249,6 +277,7 @@ func NewAutoRefreshBatchedCache(createBatches CreateBatchesFunc, syncCb SyncFunc
}

cache := &autoRefresh{
name: name,
metrics: metrics,
parallelizm: parallelizm,
createBatchesCb: createBatches,
Expand All @@ -262,8 +291,8 @@ func NewAutoRefreshBatchedCache(createBatches CreateBatchesFunc, syncCb SyncFunc
}

// Instantiates a new AutoRefresh Cache that syncs items periodically.
func NewAutoRefreshCache(syncCb SyncFunc, syncRateLimiter workqueue.RateLimiter, resyncPeriod time.Duration,
func NewAutoRefreshCache(name string, syncCb SyncFunc, syncRateLimiter workqueue.RateLimiter, resyncPeriod time.Duration,
parallelizm, size int, scope promutils.Scope) (AutoRefresh, error) {

return NewAutoRefreshBatchedCache(SingleItemBatches, syncCb, syncRateLimiter, resyncPeriod, parallelizm, size, scope)
return NewAutoRefreshBatchedCache(name, SingleItemBatches, syncCb, syncRateLimiter, resyncPeriod, parallelizm, size, scope)
}
2 changes: 1 addition & 1 deletion cache/auto_refresh_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func ExampleNewAutoRefreshCache() {
// since cache refreshes itself asynchronously, it may not notice that an object has been deleted immediately,
// so users of the cache should have the delete logic aware of this shortcoming (eg. not-exists may be a valid
// error during removal if based on status in cache).
cache, err := NewAutoRefreshCache(syncItemCb, rateLimiter, resyncPeriod, 10, 100, promutils.NewTestScope())
cache, err := NewAutoRefreshCache("my-cache", syncItemCb, rateLimiter, resyncPeriod, 10, 100, promutils.NewTestScope())
if err != nil {
panic(err)
}
Expand Down
45 changes: 43 additions & 2 deletions cache/auto_refresh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@ package cache
import (
"context"
"fmt"
"strconv"
"sync"
"testing"
"time"

"github.com/lyft/flytestdlib/atomic"

"k8s.io/client-go/util/workqueue"

"github.com/lyft/flytestdlib/errors"
Expand Down Expand Up @@ -48,7 +52,7 @@ func TestCacheTwo(t *testing.T) {

t.Run("normal operation", func(t *testing.T) {
// the size of the cache is at least as large as the number of items we're storing
cache, err := NewAutoRefreshCache(syncFakeItem, rateLimiter, testResyncPeriod, 10, 10, promutils.NewTestScope())
cache, err := NewAutoRefreshCache("fake1", syncFakeItem, rateLimiter, testResyncPeriod, 10, 10, promutils.NewTestScope())
assert.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -74,7 +78,7 @@ func TestCacheTwo(t *testing.T) {

t.Run("Not Found", func(t *testing.T) {
// the size of the cache is at least as large as the number of items we're storing
cache, err := NewAutoRefreshCache(syncFakeItem, rateLimiter, testResyncPeriod, 10, 2, promutils.NewTestScope())
cache, err := NewAutoRefreshCache("fake2", syncFakeItem, rateLimiter, testResyncPeriod, 10, 2, promutils.NewTestScope())
assert.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -101,3 +105,40 @@ func TestCacheTwo(t *testing.T) {
cancel()
})
}

func TestQueueBuildUp(t *testing.T) {
testResyncPeriod := time.Hour
rateLimiter := workqueue.DefaultControllerRateLimiter()

syncCount := atomic.NewInt32(0)
m := sync.Map{}
alwaysFailing := func(ctx context.Context, batch Batch) (
updatedBatch []ItemSyncResponse, err error) {
assert.Len(t, batch, 1)
_, existing := m.LoadOrStore(batch[0].GetID(), 0)
assert.False(t, existing, "Saw %v before", batch[0].GetID())
if existing {
t.FailNow()
}

syncCount.Inc()
return nil, fmt.Errorf("expected error")
}

size := 100
cache, err := NewAutoRefreshCache("fake2", alwaysFailing, rateLimiter, testResyncPeriod, 10, size, promutils.NewTestScope())
assert.NoError(t, err)

ctx := context.Background()
ctx, cancelNow := context.WithCancel(ctx)
defer cancelNow()

for i := 0; i < size; i++ {
_, err := cache.GetOrCreate(strconv.Itoa(i), "test")
assert.NoError(t, err)
}

assert.NoError(t, cache.Start(ctx))
time.Sleep(5 * time.Second)
assert.Equal(t, int32(size), syncCount.Load())
}
5 changes: 5 additions & 0 deletions utils/auto_refresh_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
// AutoRefreshCache with regular GetOrCreate and Delete along with background asynchronous refresh. Caller provides
// callbacks for create, refresh and delete item.
// The cache doesn't provide apis to update items.
// Deprecated: This utility is deprecated, it has been refactored and moved into `cache` package.
type AutoRefreshCache interface {
// starts background refresh of items
Start(ctx context.Context)
Expand All @@ -27,11 +28,13 @@ type AutoRefreshCache interface {
GetOrCreate(item CacheItem) (CacheItem, error)
}

// Deprecated: This utility is deprecated, it has been refactored and moved into `cache` package.
type CacheItem interface {
ID() string
}

// Possible actions for the cache to take as a result of running the sync function on any given cache item
// Deprecated: This utility is deprecated, it has been refactored and moved into `cache` package.
type CacheSyncAction int

const (
Expand All @@ -48,6 +51,7 @@ const (
// 1. The new CacheItem, and
// 2. What action should be taken. The sync function has no insight into your object, and needs to be
// told explicitly if the new item is different from the old one.
// Deprecated: This utility is deprecated, it has been refactored and moved into `cache` package.
type CacheSyncItem func(ctx context.Context, obj CacheItem) (
newItem CacheItem, result CacheSyncAction, err error)

Expand All @@ -57,6 +61,7 @@ func getEvictionFunction(counter prometheus.Counter) func(key interface{}, value
}
}

// Deprecated: This utility is deprecated, it has been refactored and moved into `cache` package.
func NewAutoRefreshCache(syncCb CacheSyncItem, syncRateLimiter RateLimiter, resyncPeriod time.Duration,
size int, scope promutils.Scope) (AutoRefreshCache, error) {

Expand Down

0 comments on commit 9d0448c

Please sign in to comment.