Skip to content

Commit

Permalink
fix(kuma-cp): don't cache filtered data (backport #5574) (#5636)
Browse files Browse the repository at this point in the history
* test(kuma-cp): fix wait for goroutine to be done (backport #5638) (#5648)

Signed-off-by: Lukasz Dziedziak <[email protected]>
  • Loading branch information
mergify[bot] authored and lukidzi committed Jan 11, 2023
1 parent 71a242d commit 0fc9d4f
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 11 deletions.
3 changes: 3 additions & 0 deletions pkg/core/resources/manager/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ func (c *cachedManager) Get(ctx context.Context, res model.Resource, fs ...store

func (c *cachedManager) List(ctx context.Context, list model.ResourceList, fs ...store.ListOptionsFunc) error {
opts := store.NewListOptions(fs...)
if !opts.IsCacheable() {
return fmt.Errorf("filter functions are not allowed for cached store")
}
cacheKey := fmt.Sprintf("LIST:%s:%s", list.GetItemType(), opts.HashCode())
obj, found := c.cache.Get(cacheKey)
if !found {
Expand Down
73 changes: 63 additions & 10 deletions pkg/core/resources/manager/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package manager_test
import (
"context"
"sync"
"sync/atomic"
"time"

. "github.com/onsi/ginkgo"
Expand All @@ -22,12 +23,12 @@ import (

type countingResourcesManager struct {
store core_store.ResourceStore
getQueries int
listQueries int
getQueries uint32
listQueries uint32
}

func (c *countingResourcesManager) Get(ctx context.Context, res core_model.Resource, fn ...core_store.GetOptionsFunc) error {
c.getQueries++
atomic.AddUint32(&c.getQueries, 1)
return c.store.Get(ctx, res, fn...)
}

Expand All @@ -36,7 +37,7 @@ func (c *countingResourcesManager) List(ctx context.Context, list core_model.Res
if list.GetItemType() == core_mesh.TrafficLogType && opts.Mesh == "slow" {
time.Sleep(10 * time.Second)
}
c.listQueries++
atomic.AddUint32(&c.listQueries, 1)
return c.store.List(ctx, list, fn...)
}

Expand Down Expand Up @@ -102,14 +103,14 @@ var _ = Describe("Cached Resource Manager", func() {

// then real manager should be called only once
Expect(fetch().Spec).To(MatchProto(res.Spec))
Expect(countingManager.getQueries).To(Equal(1))
Expect(int(countingManager.getQueries)).To(Equal(1))

// when
time.Sleep(expiration)

// then
Expect(fetch().Spec).To(MatchProto(res.Spec))
Expect(countingManager.getQueries).To(Equal(2))
Expect(int(countingManager.getQueries)).To(Equal(2))

// and metrics are published
Expect(test_metrics.FindMetric(metrics, "store_cache", "operation", "get", "result", "miss").Counter.GetValue()).To(Equal(2.0))
Expand Down Expand Up @@ -139,7 +140,7 @@ var _ = Describe("Cached Resource Manager", func() {
wg.Wait()

// then real manager should be called every time
Expect(countingManager.getQueries).To(Equal(100))
Expect(int(countingManager.getQueries)).To(Equal(100))
})

It("should cache List() queries", func() {
Expand All @@ -165,7 +166,7 @@ var _ = Describe("Cached Resource Manager", func() {
list := fetch()
Expect(list.Items).To(HaveLen(1))
Expect(list.Items[0].GetSpec()).To(MatchProto(res.Spec))
Expect(countingManager.listQueries).To(Equal(1))
Expect(int(countingManager.listQueries)).To(Equal(1))

// when
time.Sleep(expiration)
Expand All @@ -174,7 +175,7 @@ var _ = Describe("Cached Resource Manager", func() {
list = fetch()
Expect(list.Items).To(HaveLen(1))
Expect(list.Items[0].GetSpec()).To(MatchProto(res.Spec))
Expect(countingManager.listQueries).To(Equal(2))
Expect(int(countingManager.listQueries)).To(Equal(2))

// and metrics are published
Expect(test_metrics.FindMetric(metrics, "store_cache", "operation", "list", "result", "miss").Counter.GetValue()).To(Equal(2.0))
Expand All @@ -187,12 +188,14 @@ var _ = Describe("Cached Resource Manager", func() {
Expect(hits + hitWaits).To(Equal(100.0))
})

It("should let concurrent List() queries for different types and meshes", test.Within(5*time.Second, func() {
It("should let concurrent List() queries for different types and meshes", test.Within(15*time.Second, func() {
// given ongoing TrafficLog from mesh slow that takes a lot of time to complete
done := make(chan struct{})
go func() {
fetched := core_mesh.TrafficLogResourceList{}
err := cachedManager.List(context.Background(), &fetched, core_store.ListByMesh("slow"))
Expect(err).ToNot(HaveOccurred())
close(done)
}()

// when trying to fetch TrafficLog from different mesh that takes normal time to response
Expand All @@ -208,5 +211,55 @@ var _ = Describe("Cached Resource Manager", func() {

// then first request does not block request for other type
Expect(err).ToNot(HaveOccurred())
<-done
}))

It("should cache List() at different key when ordered", test.Within(5*time.Second, func() {
// when fetched resources multiple times
fetch := func(ordered bool) core_mesh.DataplaneResourceList {
fetched := core_mesh.DataplaneResourceList{}
var err error
if ordered {
err = cachedManager.List(context.Background(), &fetched, core_store.ListOrdered(), core_store.ListByMesh("default"))
} else {
err = cachedManager.List(context.Background(), &fetched, core_store.ListByMesh("default"))
}
Expect(err).ToNot(HaveOccurred())
return fetched
}

var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
fetch(false)
wg.Done()
}()
}
wg.Wait()

// then real manager should be called only once
list := fetch(false)
Expect(list.Items).To(HaveLen(1))
Expect(list.Items[0].GetSpec()).To(MatchProto(res.Spec))
Expect(int(countingManager.listQueries)).To(Equal(1))

// when call for ordered data
list = fetch(true)

// then real manager should be called
Expect(list.Items).To(HaveLen(1))
Expect(list.Items[0].GetSpec()).To(MatchProto(res.Spec))
Expect(int(countingManager.listQueries)).To(Equal(2))

// and metrics are published
Expect(test_metrics.FindMetric(metrics, "store_cache", "operation", "list", "result", "miss").Counter.GetValue()).To(Equal(2.0))
hits := test_metrics.FindMetric(metrics, "store_cache", "operation", "list", "result", "hit").Counter.GetValue()
hitWaits := 0.0
hitWaitMetric := test_metrics.FindMetric(metrics, "store_cache", "operation", "list", "result", "hit-wait")
if hitWaitMetric != nil {
hitWaits = hitWaitMetric.Counter.GetValue()
}
Expect(hits + hitWaits).To(Equal(100.0))
}))
})
6 changes: 5 additions & 1 deletion pkg/core/resources/store/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ func ListOrdered() ListOptionsFunc {
}
}

func (l *ListOptions) IsCacheable() bool {
return l.FilterFunc == nil
}

func (l *ListOptions) HashCode() string {
return l.Mesh
return fmt.Sprintf("%s:%t:%d:%s", l.Mesh, l.Ordered, l.PageSize, l.PageOffset)
}

0 comments on commit 0fc9d4f

Please sign in to comment.