Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Add item ID to the workqueue instead #165

Closed
wants to merge 18 commits into from
25 changes: 19 additions & 6 deletions cache/auto_refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,10 @@
}

for _, batch := range batches {
b := batch
w.workqueue.Add(&b)
for _, b := range batch {
logger.Debugf(ctx, "Enqueuing batch with id: %v", b.GetID())
w.workqueue.Add(b.GetID())
}
}

return nil
Expand Down Expand Up @@ -273,18 +275,29 @@
case <-ctx.Done():
return nil
default:
item, shutdown := w.workqueue.Get()
itemID, shutdown := w.workqueue.Get()
if shutdown {
logger.Debugf(ctx, "Shutting down worker")

Check warning on line 280 in cache/auto_refresh.go

View check run for this annotation

Codecov / codecov/patch

cache/auto_refresh.go#L280

Added line #L280 was not covered by tests
return nil
}

t := w.metrics.SyncLatency.Start()
updatedBatch, err := w.syncCb(ctx, *item.(*Batch))
logger.Debugf(ctx, "Syncing item with id [%v]", itemID)
item, ok := w.lruMap.Get(itemID)
if !ok {
logger.Debugf(ctx, "item with id [%v] not found in cache", itemID)
t.Stop()
continue

Check warning on line 290 in cache/auto_refresh.go

View check run for this annotation

Codecov / codecov/patch

cache/auto_refresh.go#L288-L290

Added lines #L288 - L290 were not covered by tests
}
updatedBatch, err := w.syncCb(ctx, Batch{itemWrapper{
id: itemID.(ItemID),
item: item.(Item),
}})

// Since we create batches every time we sync, we will just remove the item from the queue here
// regardless of whether it succeeded the sync or not.
w.workqueue.Forget(item)
w.workqueue.Done(item)
w.workqueue.Forget(itemID)
w.workqueue.Done(itemID)

if err != nil {
w.metrics.SyncErrors.Inc()
Expand Down
Loading