diff --git a/cache/auto_refresh.go b/cache/auto_refresh.go index e07d54f7d5..b9298a384e 100644 --- a/cache/auto_refresh.go +++ b/cache/auto_refresh.go @@ -32,7 +32,7 @@ const ( // callbacks for create, refresh and delete item. // The cache doesn't provide apis to update items. type AutoRefresh interface { - // starts background refresh of items. + // Starts background refresh of items. To shutdown the cache, cancel the context. Start(ctx context.Context) error // Get item by id. @@ -43,9 +43,12 @@ type AutoRefresh interface { } type metrics struct { - SystemErrors prometheus.Counter - Evictions prometheus.Counter - scope promutils.Scope + SyncErrors prometheus.Counter + Evictions prometheus.Counter + SyncLatency promutils.StopWatch + CacheHit prometheus.Counter + CacheMiss prometheus.Counter + scope promutils.Scope } type Item interface{} @@ -104,6 +107,7 @@ func (i itemWrapper) GetItem() Item { // // Sync is run as a fixed-interval-scheduled-task, and is skipped if sync from previous cycle is still running. type autoRefresh struct { + name string metrics metrics syncCb SyncFunc createBatchesCb CreateBatchesFunc @@ -130,9 +134,12 @@ func SingleItemBatches(_ context.Context, snapshot []ItemWrapper) (batches []Bat func newMetrics(scope promutils.Scope) metrics { return metrics{ - Evictions: scope.MustNewCounter("lru_evictions", "Counter for evictions from LRU."), - SystemErrors: scope.MustNewCounter("sync_errors", "Counter for sync errors."), - scope: scope, + SyncErrors: scope.MustNewCounter("sync_errors", "Counter for sync errors."), + Evictions: scope.MustNewCounter("lru_evictions", "Counter for evictions from LRU."), + SyncLatency: scope.MustNewStopWatch("latency", "Latency for sync operations.", time.Millisecond), + CacheHit: scope.MustNewCounter("cache_hit", "Counter for cache hits."), + CacheMiss: scope.MustNewCounter("cache_miss", "Counter for cache misses."), + scope: scope, } } @@ -143,24 +150,28 @@ func (w *autoRefresh) Start(ctx context.Context) error { if err != nil { logger.Errorf(ctx, "Failed to sync. Error: %v", err) } - }(contextutils.WithGoroutineLabel(ctx, fmt.Sprintf("worker-%v", i))) + }(contextutils.WithGoroutineLabel(ctx, fmt.Sprintf("%v-worker-%v", w.name, i))) } + enqueueCtx := contextutils.WithGoroutineLabel(ctx, fmt.Sprintf("%v-enqueue", w.name)) + go wait.Until(func() { - err := w.enqueueBatches(ctx) + err := w.enqueueBatches(enqueueCtx) if err != nil { - logger.Errorf(ctx, "Failed to sync. Error: %v", err) + logger.Errorf(enqueueCtx, "Failed to sync. Error: %v", err) } - }, w.syncPeriod, ctx.Done()) + }, w.syncPeriod, enqueueCtx.Done()) return nil } func (w *autoRefresh) Get(id ItemID) (Item, error) { if val, ok := w.lruMap.Get(id); ok { + w.metrics.CacheHit.Inc() return val.(Item), nil } + w.metrics.CacheMiss.Inc() return nil, errors.Errorf(ErrNotFound, "Item with id [%v] not found.", id) } @@ -168,10 +179,12 @@ func (w *autoRefresh) Get(id ItemID) (Item, error) { // Create should be invoked only once. recreating the object is not supported. func (w *autoRefresh) GetOrCreate(id ItemID, item Item) (Item, error) { if val, ok := w.lruMap.Get(id); ok { + w.metrics.CacheHit.Inc() return val.(Item), nil } w.lruMap.Add(id, item) + w.metrics.CacheMiss.Inc() return item, nil } @@ -218,28 +231,43 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error { // * Sync loop updates item 2, repeat func (w *autoRefresh) sync(ctx context.Context) error { for { - item, shutdown := w.workqueue.Get() - if shutdown { + select { + case <-ctx.Done(): return nil - } + default: + item, shutdown := w.workqueue.Get() + if shutdown { + return nil + } - updatedBatch, err := w.syncCb(ctx, *item.(*Batch)) - if err != nil { - logger.Error(ctx, "failed to get latest copy of a batch. Error: %v", err) - continue - } + t := w.metrics.SyncLatency.Start() + updatedBatch, err := w.syncCb(ctx, *item.(*Batch)) + + // Since we create batches every time we sync, we will just remove the item from the queue here + // regardless of whether it succeeded the sync or not. + w.workqueue.Forget(item) + w.workqueue.Done(item) + + if err != nil { + w.metrics.SyncErrors.Inc() + logger.Errorf(ctx, "failed to get latest copy of a batch. Error: %v", err) + t.Stop() + continue + } - for _, item := range updatedBatch { - if item.Action == Update { - // Add adds the item if it has been evicted or updates an existing one. - w.lruMap.Add(item.ID, item.Item) + for _, item := range updatedBatch { + if item.Action == Update { + // Add adds the item if it has been evicted or updates an existing one. + w.lruMap.Add(item.ID, item.Item) + } } + t.Stop() } } } // Instantiates a new AutoRefresh Cache that syncs items in batches. -func NewAutoRefreshBatchedCache(createBatches CreateBatchesFunc, syncCb SyncFunc, syncRateLimiter workqueue.RateLimiter, +func NewAutoRefreshBatchedCache(name string, createBatches CreateBatchesFunc, syncCb SyncFunc, syncRateLimiter workqueue.RateLimiter, resyncPeriod time.Duration, parallelizm, size int, scope promutils.Scope) (AutoRefresh, error) { metrics := newMetrics(scope) @@ -249,6 +277,7 @@ func NewAutoRefreshBatchedCache(createBatches CreateBatchesFunc, syncCb SyncFunc } cache := &autoRefresh{ + name: name, metrics: metrics, parallelizm: parallelizm, createBatchesCb: createBatches, @@ -262,8 +291,8 @@ func NewAutoRefreshBatchedCache(createBatches CreateBatchesFunc, syncCb SyncFunc } // Instantiates a new AutoRefresh Cache that syncs items periodically. -func NewAutoRefreshCache(syncCb SyncFunc, syncRateLimiter workqueue.RateLimiter, resyncPeriod time.Duration, +func NewAutoRefreshCache(name string, syncCb SyncFunc, syncRateLimiter workqueue.RateLimiter, resyncPeriod time.Duration, parallelizm, size int, scope promutils.Scope) (AutoRefresh, error) { - return NewAutoRefreshBatchedCache(SingleItemBatches, syncCb, syncRateLimiter, resyncPeriod, parallelizm, size, scope) + return NewAutoRefreshBatchedCache(name, SingleItemBatches, syncCb, syncRateLimiter, resyncPeriod, parallelizm, size, scope) } diff --git a/cache/auto_refresh_example_test.go b/cache/auto_refresh_example_test.go index 1b5873c7ee..27fbec281e 100644 --- a/cache/auto_refresh_example_test.go +++ b/cache/auto_refresh_example_test.go @@ -88,7 +88,7 @@ func ExampleNewAutoRefreshCache() { // since cache refreshes itself asynchronously, it may not notice that an object has been deleted immediately, // so users of the cache should have the delete logic aware of this shortcoming (eg. not-exists may be a valid // error during removal if based on status in cache). - cache, err := NewAutoRefreshCache(syncItemCb, rateLimiter, resyncPeriod, 10, 100, promutils.NewTestScope()) + cache, err := NewAutoRefreshCache("my-cache", syncItemCb, rateLimiter, resyncPeriod, 10, 100, promutils.NewTestScope()) if err != nil { panic(err) } diff --git a/cache/auto_refresh_test.go b/cache/auto_refresh_test.go index 77723e068e..b5df48883d 100644 --- a/cache/auto_refresh_test.go +++ b/cache/auto_refresh_test.go @@ -3,9 +3,13 @@ package cache import ( "context" "fmt" + "strconv" + "sync" "testing" "time" + "github.com/lyft/flytestdlib/atomic" + "k8s.io/client-go/util/workqueue" "github.com/lyft/flytestdlib/errors" @@ -48,7 +52,7 @@ func TestCacheTwo(t *testing.T) { t.Run("normal operation", func(t *testing.T) { // the size of the cache is at least as large as the number of items we're storing - cache, err := NewAutoRefreshCache(syncFakeItem, rateLimiter, testResyncPeriod, 10, 10, promutils.NewTestScope()) + cache, err := NewAutoRefreshCache("fake1", syncFakeItem, rateLimiter, testResyncPeriod, 10, 10, promutils.NewTestScope()) assert.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -74,7 +78,7 @@ func TestCacheTwo(t *testing.T) { t.Run("Not Found", func(t *testing.T) { // the size of the cache is at least as large as the number of items we're storing - cache, err := NewAutoRefreshCache(syncFakeItem, rateLimiter, testResyncPeriod, 10, 2, promutils.NewTestScope()) + cache, err := NewAutoRefreshCache("fake2", syncFakeItem, rateLimiter, testResyncPeriod, 10, 2, promutils.NewTestScope()) assert.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -101,3 +105,40 @@ func TestCacheTwo(t *testing.T) { cancel() }) } + +func TestQueueBuildUp(t *testing.T) { + testResyncPeriod := time.Hour + rateLimiter := workqueue.DefaultControllerRateLimiter() + + syncCount := atomic.NewInt32(0) + m := sync.Map{} + alwaysFailing := func(ctx context.Context, batch Batch) ( + updatedBatch []ItemSyncResponse, err error) { + assert.Len(t, batch, 1) + _, existing := m.LoadOrStore(batch[0].GetID(), 0) + assert.False(t, existing, "Saw %v before", batch[0].GetID()) + if existing { + t.FailNow() + } + + syncCount.Inc() + return nil, fmt.Errorf("expected error") + } + + size := 100 + cache, err := NewAutoRefreshCache("fake2", alwaysFailing, rateLimiter, testResyncPeriod, 10, size, promutils.NewTestScope()) + assert.NoError(t, err) + + ctx := context.Background() + ctx, cancelNow := context.WithCancel(ctx) + defer cancelNow() + + for i := 0; i < size; i++ { + _, err := cache.GetOrCreate(strconv.Itoa(i), "test") + assert.NoError(t, err) + } + + assert.NoError(t, cache.Start(ctx)) + time.Sleep(5 * time.Second) + assert.Equal(t, int32(size), syncCount.Load()) +} diff --git a/utils/auto_refresh_cache.go b/utils/auto_refresh_cache.go index 9a72d22105..e9c22e2ed5 100644 --- a/utils/auto_refresh_cache.go +++ b/utils/auto_refresh_cache.go @@ -16,6 +16,7 @@ import ( // AutoRefreshCache with regular GetOrCreate and Delete along with background asynchronous refresh. Caller provides // callbacks for create, refresh and delete item. // The cache doesn't provide apis to update items. +// Deprecated: This utility is deprecated, it has been refactored and moved into `cache` package. type AutoRefreshCache interface { // starts background refresh of items Start(ctx context.Context) @@ -27,11 +28,13 @@ type AutoRefreshCache interface { GetOrCreate(item CacheItem) (CacheItem, error) } +// Deprecated: This utility is deprecated, it has been refactored and moved into `cache` package. type CacheItem interface { ID() string } // Possible actions for the cache to take as a result of running the sync function on any given cache item +// Deprecated: This utility is deprecated, it has been refactored and moved into `cache` package. type CacheSyncAction int const ( @@ -48,6 +51,7 @@ const ( // 1. The new CacheItem, and // 2. What action should be taken. The sync function has no insight into your object, and needs to be // told explicitly if the new item is different from the old one. +// Deprecated: This utility is deprecated, it has been refactored and moved into `cache` package. type CacheSyncItem func(ctx context.Context, obj CacheItem) ( newItem CacheItem, result CacheSyncAction, err error) @@ -57,6 +61,7 @@ func getEvictionFunction(counter prometheus.Counter) func(key interface{}, value } } +// Deprecated: This utility is deprecated, it has been refactored and moved into `cache` package. func NewAutoRefreshCache(syncCb CacheSyncItem, syncRateLimiter RateLimiter, resyncPeriod time.Duration, size int, scope promutils.Scope) (AutoRefreshCache, error) {