From 264ff95634cd0ce643d6821d1cd0e740a6fb0602 Mon Sep 17 00:00:00 2001 From: Rudrakh Panigrahi Date: Sat, 18 Nov 2023 12:04:58 +0530 Subject: [PATCH] Clean expired cache entries periodically Regularly clean up of cache entries that have expired for a more efficient use of memory. Introduce two new parameters to tune clean up frequency and threshold for forced FIFO eviction. Fixes #5320 Signed-off-by: Rudrakh Panigrahi --- runtime/runtime.go | 2 + sdk/opa.go | 2 +- server/server.go | 6 +- topdown/cache/cache.go | 140 +++++++++++++++++++++++--- topdown/cache/cache_test.go | 194 +++++++++++++++++++++++++++++++++++- topdown/http.go | 19 ++-- topdown/http_test.go | 28 +++--- 7 files changed, 349 insertions(+), 42 deletions(-) diff --git a/runtime/runtime.go b/runtime/runtime.go index a3fb495f09..988db0a37b 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -581,6 +581,8 @@ func (rt *Runtime) Serve(ctx context.Context) error { }) } + ctx, cancel := context.WithCancel(ctx) + defer cancel() rt.server, err = rt.server.Init(ctx) if err != nil { rt.logger.WithFields(map[string]interface{}{"err": err}).Error("Unable to initialize server.") diff --git a/sdk/opa.go b/sdk/opa.go index f7e2cccaa1..93ad613896 100644 --- a/sdk/opa.go +++ b/sdk/opa.go @@ -212,7 +212,7 @@ func (opa *OPA) configure(ctx context.Context, bs []byte, ready chan struct{}, b opa.state.manager = manager opa.state.queryCache.Clear() - opa.state.interQueryBuiltinCache = cache.NewInterQueryCache(manager.InterQueryBuiltinCacheConfig()) + opa.state.interQueryBuiltinCache = cache.NewInterQueryCacheWithContext(ctx, manager.InterQueryBuiltinCacheConfig()) opa.config = bs return nil diff --git a/server/server.go b/server/server.go index bd8dc727f0..9d9c9d6cdf 100644 --- a/server/server.go +++ b/server/server.go @@ -184,7 +184,7 @@ func New() *Server { // Init initializes the server. This function MUST be called before starting any loops // from s.Listeners(). func (s *Server) Init(ctx context.Context) (*Server, error) { - s.initRouters() + s.initRouters(ctx) txn, err := s.store.NewTransaction(ctx, storage.WriteParams) if err != nil { @@ -755,7 +755,7 @@ func (s *Server) initHandlerCompression(handler http.Handler) (http.Handler, err return compressHandler, nil } -func (s *Server) initRouters() { +func (s *Server) initRouters(ctx context.Context) { mainRouter := s.router if mainRouter == nil { mainRouter = mux.NewRouter() @@ -764,7 +764,7 @@ func (s *Server) initRouters() { diagRouter := mux.NewRouter() // authorizer, if configured, needs the iCache to be set up already - s.interQueryBuiltinCache = iCache.NewInterQueryCache(s.manager.InterQueryBuiltinCacheConfig()) + s.interQueryBuiltinCache = iCache.NewInterQueryCacheWithContext(ctx, s.manager.InterQueryBuiltinCacheConfig()) s.manager.RegisterCacheTrigger(s.updateCacheConfig) // Add authorization handler. This must come BEFORE authentication handler diff --git a/topdown/cache/cache.go b/topdown/cache/cache.go index f9d2bcff75..c83c9828bf 100644 --- a/topdown/cache/cache.go +++ b/topdown/cache/cache.go @@ -7,15 +7,20 @@ package cache import ( "container/list" + "context" + "fmt" + "math" "sync" + "time" "github.com/open-policy-agent/opa/ast" - "github.com/open-policy-agent/opa/util" ) const ( - defaultMaxSizeBytes = int64(0) // unlimited + defaultMaxSizeBytes = int64(0) // unlimited + defaultForcedEvictionThresholdPercentage = int64(100) // trigger at max_size_bytes + defaultStaleEntryEvictionPeriodSeconds = int64(0) // never ) // Config represents the configuration of the inter-query cache. @@ -24,8 +29,13 @@ type Config struct { } // InterQueryBuiltinCacheConfig represents the configuration of the inter-query cache that built-in functions can utilize. +// MaxSizeBytes - max capacity of cache in bytes +// ForcedEvictionThresholdPercentage - capacity usage in percentage after which forced FIFO eviction starts +// StaleEntryEvictionPeriodSeconds - time period between end of previous and start of new stale entry eviction routine type InterQueryBuiltinCacheConfig struct { - MaxSizeBytes *int64 `json:"max_size_bytes,omitempty"` + MaxSizeBytes *int64 `json:"max_size_bytes,omitempty"` + ForcedEvictionThresholdPercentage *int64 `json:"forced_eviction_threshold_percentage,omitempty"` + StaleEntryEvictionPeriodSeconds *int64 `json:"stale_entry_eviction_period_seconds,omitempty"` } // ParseCachingConfig returns the config for the inter-query cache. @@ -33,7 +43,11 @@ func ParseCachingConfig(raw []byte) (*Config, error) { if raw == nil { maxSize := new(int64) *maxSize = defaultMaxSizeBytes - return &Config{InterQueryBuiltinCache: InterQueryBuiltinCacheConfig{MaxSizeBytes: maxSize}}, nil + threshold := new(int64) + *threshold = defaultForcedEvictionThresholdPercentage + period := new(int64) + *period = defaultStaleEntryEvictionPeriodSeconds + return &Config{InterQueryBuiltinCache: InterQueryBuiltinCacheConfig{MaxSizeBytes: maxSize, ForcedEvictionThresholdPercentage: threshold, StaleEntryEvictionPeriodSeconds: period}}, nil } var config Config @@ -55,6 +69,26 @@ func (c *Config) validateAndInjectDefaults() error { *maxSize = defaultMaxSizeBytes c.InterQueryBuiltinCache.MaxSizeBytes = maxSize } + if c.InterQueryBuiltinCache.ForcedEvictionThresholdPercentage == nil { + threshold := new(int64) + *threshold = defaultForcedEvictionThresholdPercentage + c.InterQueryBuiltinCache.ForcedEvictionThresholdPercentage = threshold + } else { + threshold := *c.InterQueryBuiltinCache.ForcedEvictionThresholdPercentage + if threshold < 0 || threshold > 100 { + return fmt.Errorf("invalid forced_eviction_threshold_percentage %v", threshold) + } + } + if c.InterQueryBuiltinCache.StaleEntryEvictionPeriodSeconds == nil { + period := new(int64) + *period = defaultStaleEntryEvictionPeriodSeconds + c.InterQueryBuiltinCache.StaleEntryEvictionPeriodSeconds = period + } else { + period := *c.InterQueryBuiltinCache.StaleEntryEvictionPeriodSeconds + if period < 0 { + return fmt.Errorf("invalid stale_entry_eviction_period_seconds %v", period) + } + } return nil } @@ -68,23 +102,55 @@ type InterQueryCacheValue interface { type InterQueryCache interface { Get(key ast.Value) (value InterQueryCacheValue, found bool) Insert(key ast.Value, value InterQueryCacheValue) int + InsertWithExpiry(key ast.Value, value InterQueryCacheValue, expiresAt time.Time) int Delete(key ast.Value) UpdateConfig(config *Config) Clone(value InterQueryCacheValue) (InterQueryCacheValue, error) } // NewInterQueryCache returns a new inter-query cache. +// The cache uses a FIFO eviction policy when it reaches the forced eviction threshold. +// Parameters: +// +// config - to configure the InterQueryCache func NewInterQueryCache(config *Config) InterQueryCache { - return &cache{ - items: map[string]cacheItem{}, - usage: 0, - config: config, - l: list.New(), + return newCache(config) +} + +// NewInterQueryCacheWithContext returns a new inter-query cache with context. +// The cache uses a combination of FIFO eviction policy when it reaches the forced eviction threshold +// and a periodic cleanup routine to remove stale entries that exceed their expiration time, if specified. +// If configured with a zero stale_entry_eviction_period_seconds value, the stale entry cleanup routine is disabled. +// +// Parameters: +// +// ctx - used to control lifecycle of the stale entry cleanup routine +// config - to configure the InterQueryCache +func NewInterQueryCacheWithContext(ctx context.Context, config *Config) InterQueryCache { + iqCache := newCache(config) + if iqCache.staleEntryEvictionTimePeriodSeconds() > 0 { + cleanupTicker := time.NewTicker(time.Duration(iqCache.staleEntryEvictionTimePeriodSeconds()) * time.Second) + go func() { + for { + select { + case <-cleanupTicker.C: + cleanupTicker.Stop() + iqCache.cleanStaleValues() + cleanupTicker = time.NewTicker(time.Duration(iqCache.staleEntryEvictionTimePeriodSeconds()) * time.Second) + case <-ctx.Done(): + cleanupTicker.Stop() + return + } + } + }() } + + return iqCache } type cacheItem struct { value InterQueryCacheValue + expiresAt time.Time keyElement *list.Element } @@ -96,11 +162,26 @@ type cache struct { mtx sync.Mutex } -// Insert inserts a key k into the cache with value v. -func (c *cache) Insert(k ast.Value, v InterQueryCacheValue) (dropped int) { +func newCache(config *Config) *cache { + return &cache{ + items: map[string]cacheItem{}, + usage: 0, + config: config, + l: list.New(), + } +} + +// InsertWithExpiry inserts a key k into the cache with value v with an expiration time expiresAt. +// A zero time value for expiresAt indicates no expiry +func (c *cache) InsertWithExpiry(k ast.Value, v InterQueryCacheValue, expiresAt time.Time) (dropped int) { c.mtx.Lock() defer c.mtx.Unlock() - return c.unsafeInsert(k, v) + return c.unsafeInsert(k, v, expiresAt) +} + +// Insert inserts a key k into the cache with value v with no expiration time. +func (c *cache) Insert(k ast.Value, v InterQueryCacheValue) (dropped int) { + return c.InsertWithExpiry(k, v, time.Time{}) } // Get returns the value in the cache for k. @@ -137,10 +218,9 @@ func (c *cache) Clone(value InterQueryCacheValue) (InterQueryCacheValue, error) return c.unsafeClone(value) } -func (c *cache) unsafeInsert(k ast.Value, v InterQueryCacheValue) (dropped int) { +func (c *cache) unsafeInsert(k ast.Value, v InterQueryCacheValue, expiresAt time.Time) (dropped int) { size := v.SizeInBytes() - limit := c.maxSizeBytes() - + limit := int64(math.Ceil(float64(c.forcedEvictionThresholdPercentage())/100.0) * (float64(c.maxSizeBytes()))) if limit > 0 { if size > limit { dropped++ @@ -159,6 +239,7 @@ func (c *cache) unsafeInsert(k ast.Value, v InterQueryCacheValue) (dropped int) c.items[k.String()] = cacheItem{ value: v, + expiresAt: expiresAt, keyElement: c.l.PushBack(k), } c.usage += size @@ -191,3 +272,32 @@ func (c *cache) maxSizeBytes() int64 { } return *c.config.InterQueryBuiltinCache.MaxSizeBytes } + +func (c *cache) forcedEvictionThresholdPercentage() int64 { + if c.config == nil { + return defaultForcedEvictionThresholdPercentage + } + return *c.config.InterQueryBuiltinCache.ForcedEvictionThresholdPercentage +} + +func (c *cache) staleEntryEvictionTimePeriodSeconds() int64 { + if c.config == nil { + return defaultStaleEntryEvictionPeriodSeconds + } + return *c.config.InterQueryBuiltinCache.StaleEntryEvictionPeriodSeconds +} + +func (c *cache) cleanStaleValues() (dropped int) { + c.mtx.Lock() + defer c.mtx.Unlock() + for key := c.l.Front(); key != nil; { + nextKey := key.Next() + // if expiresAt is zero, the item doesn't have an expiry + if ea := c.items[(key.Value.(ast.Value)).String()].expiresAt; !ea.IsZero() && ea.Before(time.Now()) { + c.unsafeDelete(key.Value.(ast.Value)) + dropped++ + } + key = nextKey + } + return dropped +} diff --git a/topdown/cache/cache_test.go b/topdown/cache/cache_test.go index 85375e93f0..85ccb20913 100644 --- a/topdown/cache/cache_test.go +++ b/topdown/cache/cache_test.go @@ -5,9 +5,11 @@ package cache import ( + "context" "reflect" "sync" "testing" + "time" "github.com/open-policy-agent/opa/ast" ) @@ -15,7 +17,11 @@ import ( func TestParseCachingConfig(t *testing.T) { maxSize := new(int64) *maxSize = defaultMaxSizeBytes - expected := &Config{InterQueryBuiltinCache: InterQueryBuiltinCacheConfig{MaxSizeBytes: maxSize}} + period := new(int64) + *period = defaultStaleEntryEvictionPeriodSeconds + threshold := new(int64) + *threshold = defaultForcedEvictionThresholdPercentage + expected := &Config{InterQueryBuiltinCache: InterQueryBuiltinCacheConfig{MaxSizeBytes: maxSize, StaleEntryEvictionPeriodSeconds: period, ForcedEvictionThresholdPercentage: threshold}} tests := map[string]struct { input []byte @@ -277,6 +283,184 @@ func TestDelete(t *testing.T) { verifyCacheList(t, cache) } +func TestInsertWithExpiryAndEviction(t *testing.T) { + // 50 byte max size + // 1s stale cleanup period + // 80% threshold to for FIFO eviction (eviction after 40 bytes) + in := `{"inter_query_builtin_cache": {"max_size_bytes": 50, "stale_entry_eviction_period_seconds": 1, "forced_eviction_threshold_percentage": 80},}` + + config, err := ParseCachingConfig([]byte(in)) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + + cache := NewInterQueryCacheWithContext(context.Background(), config) + + cacheValue := newInterQueryCacheValue(ast.StringTerm("bar").Value, 20) + cache.InsertWithExpiry(ast.StringTerm("force_evicted_foo").Value, cacheValue, time.Now().Add(100*time.Second)) + if fetchedCacheValue, found := cache.Get(ast.StringTerm("force_evicted_foo").Value); !found { + t.Fatalf("Expected cache entry with value %v, found %v", cacheValue, fetchedCacheValue) + } + cache.InsertWithExpiry(ast.StringTerm("expired_foo").Value, cacheValue, time.Now().Add(1*time.Second)) + if fetchedCacheValue, found := cache.Get(ast.StringTerm("expired_foo").Value); !found { + t.Fatalf("Expected cache entry with value %v, found %v", cacheValue, fetchedCacheValue) + } + cache.InsertWithExpiry(ast.StringTerm("foo").Value, cacheValue, time.Now().Add(10*time.Second)) + if fetchedCacheValue, found := cache.Get(ast.StringTerm("foo").Value); !found { + t.Fatalf("Expected cache entry with value %v, found %v", cacheValue, fetchedCacheValue) + } + + // Ensure stale entries clean up routine runs at least once + time.Sleep(2 * time.Second) + + // Entry deleted even though not expired because force evicted when foo is inserted + if fetchedCacheValue, found := cache.Get(ast.StringTerm("force_evicted_foo").Value); found { + t.Fatalf("Didn't expect cache entry for force_evicted_foo, found entry with value %v", fetchedCacheValue) + } + // Stale clean up routine runs and deletes expired entry + if fetchedCacheValue, found := cache.Get(ast.StringTerm("expired_foo").Value); found { + t.Fatalf("Didn't expect cache entry for expired_foo, found entry with value %v", fetchedCacheValue) + } + // Stale clean up routine runs but doesn't delete the entry + if fetchedCacheValue, found := cache.Get(ast.StringTerm("foo").Value); !found { + t.Fatalf("Expected cache entry with value %v for foo, found %v", cacheValue, fetchedCacheValue) + } +} + +func TestInsertHighTTLWithStaleEntryCleanup(t *testing.T) { + // 40 byte max size + // 1s stale cleanup period + // 100% threshold to for FIFO eviction (eviction after 40 bytes) + in := `{"inter_query_builtin_cache": {"max_size_bytes": 40, "stale_entry_eviction_period_seconds": 1, "forced_eviction_threshold_percentage": 100},}` + + config, err := ParseCachingConfig([]byte(in)) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + + cache := NewInterQueryCacheWithContext(context.Background(), config) + + cacheValue := newInterQueryCacheValue(ast.StringTerm("bar").Value, 20) + cache.InsertWithExpiry(ast.StringTerm("high_ttl_foo").Value, cacheValue, time.Now().Add(100*time.Second)) + if fetchedCacheValue, found := cache.Get(ast.StringTerm("high_ttl_foo").Value); !found { + t.Fatalf("Expected cache entry with value %v, found %v", cacheValue, fetchedCacheValue) + } + cache.InsertWithExpiry(ast.StringTerm("expired_foo").Value, cacheValue, time.Now().Add(1*time.Second)) + if fetchedCacheValue, found := cache.Get(ast.StringTerm("expired_foo").Value); !found { + t.Fatalf("Expected cache entry with value %v, found no entry", fetchedCacheValue) + } + + // Ensure stale entries clean up routine runs at least once + time.Sleep(2 * time.Second) + + cache.InsertWithExpiry(ast.StringTerm("foo").Value, cacheValue, time.Now().Add(10*time.Second)) + if fetchedCacheValue, found := cache.Get(ast.StringTerm("foo").Value); !found { + t.Fatalf("Expected cache entry with value %v, found %v", cacheValue, fetchedCacheValue) + } + + // Since expired_foo is deleted by stale cleanup routine, high_ttl_foo is not evicted when foo is inserted + if fetchedCacheValue, found := cache.Get(ast.StringTerm("high_ttl_foo").Value); !found { + t.Fatalf("Expected cache entry with value %v for high_ttl_foo, found %v", cacheValue, fetchedCacheValue) + } + // Stale clean up routine runs and deletes expired entry + if fetchedCacheValue, found := cache.Get(ast.StringTerm("expired_foo").Value); found { + t.Fatalf("Didn't expect cache entry for expired_foo, found entry with value %v", fetchedCacheValue) + } +} + +func TestInsertHighTTLWithoutStaleEntryCleanup(t *testing.T) { + // 40 byte max size + // 0s stale cleanup period -> no cleanup + // 100% threshold to for FIFO eviction (eviction after 40 bytes) + in := `{"inter_query_builtin_cache": {"max_size_bytes": 40, "stale_entry_eviction_period_seconds": 0, "forced_eviction_threshold_percentage": 100},}` + + config, err := ParseCachingConfig([]byte(in)) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + + cache := NewInterQueryCacheWithContext(context.Background(), config) + + cacheValue := newInterQueryCacheValue(ast.StringTerm("bar").Value, 20) + cache.InsertWithExpiry(ast.StringTerm("high_ttl_foo").Value, cacheValue, time.Now().Add(100*time.Second)) + if fetchedCacheValue, found := cache.Get(ast.StringTerm("high_ttl_foo").Value); !found { + t.Fatalf("Expected cache entry with value %v for high_ttl_foo, found no entry", fetchedCacheValue) + } + cache.InsertWithExpiry(ast.StringTerm("expired_foo").Value, cacheValue, time.Now().Add(1*time.Second)) + if fetchedCacheValue, found := cache.Get(ast.StringTerm("expired_foo").Value); !found { + t.Fatalf("Expected cache entry with value %v for expired_foo, found no entry", fetchedCacheValue) + } + + cache.InsertWithExpiry(ast.StringTerm("foo").Value, cacheValue, time.Now().Add(10*time.Second)) + if fetchedCacheValue, found := cache.Get(ast.StringTerm("foo").Value); !found { + t.Fatalf("Expected cache entry with value %v for foo, found no entry", fetchedCacheValue) + } + + // Since stale cleanup routine is disabled, high_ttl_foo is evicted when foo is inserted + if fetchedCacheValue, found := cache.Get(ast.StringTerm("high_ttl_foo").Value); found { + t.Fatalf("Didn't expect cache entry for high_ttl_foo, found entry with value %v", fetchedCacheValue) + } + // Stale clean up disabled so expired entry exists + if fetchedCacheValue, found := cache.Get(ast.StringTerm("expired_foo").Value); !found { + t.Fatalf("Expected cache entry with value %v for expired_foo, found %v", cacheValue, fetchedCacheValue) + } +} + +func TestZeroExpiryTime(t *testing.T) { + // 20 byte max size + // 1s stale cleanup period + // 100% threshold to for FIFO eviction (eviction after 40 bytes) + in := `{"inter_query_builtin_cache": {"max_size_bytes": 20, "stale_entry_eviction_period_seconds": 1, "forced_eviction_threshold_percentage": 100},}` + + config, err := ParseCachingConfig([]byte(in)) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + + cache := NewInterQueryCacheWithContext(context.Background(), config) + cacheValue := newInterQueryCacheValue(ast.StringTerm("bar").Value, 20) + cache.InsertWithExpiry(ast.StringTerm("foo").Value, cacheValue, time.Time{}) + if fetchedCacheValue, found := cache.Get(ast.StringTerm("foo").Value); !found { + t.Fatalf("Expected cache entry with value %v for foo, found %v", cacheValue, fetchedCacheValue) + } + + time.Sleep(2 * time.Second) + + // Stale entry cleanup routine skips zero time cache entries + if fetchedCacheValue, found := cache.Get(ast.StringTerm("foo").Value); !found { + t.Fatalf("Expected cache entry with value %v for foo, found %v", cacheValue, fetchedCacheValue) + } +} + +func TestCancelNewInterQueryCacheWithContext(t *testing.T) { + // 40 byte max size + // 1s stale cleanup period + // 100% threshold to for FIFO eviction (eviction after 40 bytes) + in := `{"inter_query_builtin_cache": {"max_size_bytes": 40, "stale_entry_eviction_period_seconds": 1, "forced_eviction_threshold_percentage": 100},}` + + config, err := ParseCachingConfig([]byte(in)) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + cache := NewInterQueryCacheWithContext(ctx, config) + cacheValue := newInterQueryCacheValue(ast.StringTerm("bar").Value, 20) + cache.InsertWithExpiry(ast.StringTerm("foo").Value, cacheValue, time.Now().Add(100*time.Millisecond)) + if fetchedCacheValue, found := cache.Get(ast.StringTerm("foo").Value); !found { + t.Fatalf("Expected cache entry with value %v for foo, found %v", cacheValue, fetchedCacheValue) + } + + cancel() + time.Sleep(2 * time.Second) + + // Stale entry cleanup routine stopped as context was cancelled + if fetchedCacheValue, found := cache.Get(ast.StringTerm("foo").Value); !found { + t.Fatalf("Expected cache entry with value %v for foo, found %v", cacheValue, fetchedCacheValue) + } + +} + func TestUpdateConfig(t *testing.T) { config, err := ParseCachingConfig(nil) if err != nil { @@ -304,7 +488,7 @@ func TestUpdateConfig(t *testing.T) { } } -func TestDefaultMaxSizeBytes(t *testing.T) { +func TestDefaultConfigValues(t *testing.T) { c := NewInterQueryCache(nil) actualC, ok := c.(*cache) if !ok { @@ -313,6 +497,12 @@ func TestDefaultMaxSizeBytes(t *testing.T) { if actualC.maxSizeBytes() != defaultMaxSizeBytes { t.Fatal("Expected maxSizeBytes() to return default when config is nil") } + if actualC.forcedEvictionThresholdPercentage() != defaultForcedEvictionThresholdPercentage { + t.Fatal("Expected forcedEvictionThresholdPercentage() to return default when config is nil") + } + if actualC.staleEntryEvictionTimePeriodSeconds() != defaultStaleEntryEvictionPeriodSeconds { + t.Fatal("Expected staleEntryEvictionTimePeriodSeconds() to return default when config is nil") + } } // Verifies that the size of c.l is identical to the size of c.items diff --git a/topdown/http.go b/topdown/http.go index bf5dbb55d3..d4d67d85ec 100644 --- a/topdown/http.go +++ b/topdown/http.go @@ -888,7 +888,7 @@ func (c *interQueryCache) checkHTTPSendInterQueryCache() (ast.Value, error) { pcv = cachedRespData } - c.bctx.InterQueryBuiltinCache.Insert(c.key, pcv) + c.bctx.InterQueryBuiltinCache.InsertWithExpiry(c.key, pcv, cachedRespData.ExpiresAt) return cachedRespData.formatToAST(c.forceJSONDecode, c.forceYAMLDecode) } @@ -924,18 +924,19 @@ func insertIntoHTTPSendInterQueryCache(bctx BuiltinContext, key ast.Value, resp } var pcv cache.InterQueryCacheValue - + var pcvData *interQueryCacheData if cachingMode == defaultCachingMode { - pcv, err = newInterQueryCacheValue(bctx, resp, respBody, cacheParams) + pcv, pcvData, err = newInterQueryCacheValue(bctx, resp, respBody, cacheParams) } else { - pcv, err = newInterQueryCacheData(bctx, resp, respBody, cacheParams) + pcvData, err = newInterQueryCacheData(bctx, resp, respBody, cacheParams) + pcv = pcvData } if err != nil { return err } - requestCache.Insert(key, pcv) + requestCache.InsertWithExpiry(key, pcv, pcvData.ExpiresAt) return nil } @@ -1030,17 +1031,17 @@ type interQueryCacheValue struct { Data []byte } -func newInterQueryCacheValue(bctx BuiltinContext, resp *http.Response, respBody []byte, cacheParams *forceCacheParams) (*interQueryCacheValue, error) { +func newInterQueryCacheValue(bctx BuiltinContext, resp *http.Response, respBody []byte, cacheParams *forceCacheParams) (*interQueryCacheValue, *interQueryCacheData, error) { data, err := newInterQueryCacheData(bctx, resp, respBody, cacheParams) if err != nil { - return nil, err + return nil, nil, err } b, err := json.Marshal(data) if err != nil { - return nil, err + return nil, nil, err } - return &interQueryCacheValue{Data: b}, nil + return &interQueryCacheValue{Data: b}, data, nil } func (cb interQueryCacheValue) Clone() (cache.InterQueryCacheValue, error) { diff --git a/topdown/http_test.go b/topdown/http_test.go index 8f9c71b9e3..df59bd1ec7 100644 --- a/topdown/http_test.go +++ b/topdown/http_test.go @@ -1129,8 +1129,8 @@ func TestHTTPSendIntraQueryCaching(t *testing.T) { })) defer ts.Close() - config, _ := iCache.ParseCachingConfig(nil) - interQueryCache := iCache.NewInterQueryCache(config) + config, _ := iCache.ParseCachingConfig([]byte(`{"inter_query_builtin_cache": {"max_size_bytes": 500, "stale_entry_eviction_period_seconds": 1, "forced_eviction_threshold_percentage": 80},}`)) + interQueryCache := iCache.NewInterQueryCacheWithContext(context.Background(), config) opts := []func(*Query) *Query{ setTime(t0), @@ -1538,8 +1538,8 @@ func TestHTTPSendInterQueryForceCachingRefresh(t *testing.T) { request := strings.ReplaceAll(tc.request, "%URL%", ts.URL) request = strings.ReplaceAll(request, "%CACHE%", strconv.Itoa(cacheTime)) full := fmt.Sprintf("http.send(%s, x)", request) - config, _ := iCache.ParseCachingConfig(nil) - interQueryCache := iCache.NewInterQueryCache(config) + config, _ := iCache.ParseCachingConfig([]byte(`{"inter_query_builtin_cache": {"max_size_bytes": 500, "stale_entry_eviction_period_seconds": 1, "forced_eviction_threshold_percentage": 80},}`)) + interQueryCache := iCache.NewInterQueryCacheWithContext(context.Background(), config) q := NewQuery(ast.MustParseBody(full)). WithInterQueryBuiltinCache(interQueryCache). WithTime(t0) @@ -1598,7 +1598,7 @@ func TestHTTPSendInterQueryForceCachingRefresh(t *testing.T) { t.Fatal(err) } - interQueryCache.Insert(cacheKey, v) + interQueryCache.InsertWithExpiry(cacheKey, v, m.ExpiresAt) } actualCount := len(requests) @@ -1769,8 +1769,8 @@ func TestHTTPSendInterQueryCachingNewResp(t *testing.T) { } func newQuery(qStr string, t0 time.Time) *Query { - config, _ := iCache.ParseCachingConfig(nil) - interQueryCache := iCache.NewInterQueryCache(config) + config, _ := iCache.ParseCachingConfig([]byte(`{"inter_query_builtin_cache": {"max_size_bytes": 500, "stale_entry_eviction_period_seconds": 1, "forced_eviction_threshold_percentage": 80},}`)) + interQueryCache := iCache.NewInterQueryCacheWithContext(context.Background(), config) ctx := context.Background() store := inmem.New() txn := storage.NewTransactionOrDie(ctx, store) @@ -2159,7 +2159,7 @@ func TestNewInterQueryCacheValue(t *testing.T) { Body: io.NopCloser(bytes.NewBuffer(b)), } - result, err := newInterQueryCacheValue(BuiltinContext{}, response, b, &forceCacheParams{}) + result, _, err := newInterQueryCacheValue(BuiltinContext{}, response, b, &forceCacheParams{}) if err != nil { t.Fatalf("Unexpected error %v", err) } @@ -2934,8 +2934,8 @@ func TestHTTPSendCacheDefaultStatusCodesInterQueryCache(t *testing.T) { t.Run("non-cacheable status code: inter-query cache", func(t *testing.T) { // add an inter-query cache - config, _ := iCache.ParseCachingConfig(nil) - interQueryCache := iCache.NewInterQueryCache(config) + config, _ := iCache.ParseCachingConfig([]byte(`{"inter_query_builtin_cache": {"max_size_bytes": 500, "stale_entry_eviction_period_seconds": 1, "forced_eviction_threshold_percentage": 80},}`)) + interQueryCache := iCache.NewInterQueryCacheWithContext(context.Background(), config) m := metrics.New() @@ -2992,6 +2992,10 @@ func (c *onlyOnceInterQueryCache) Insert(_ ast.Value, _ iCache.InterQueryCacheVa return 0 } +func (c *onlyOnceInterQueryCache) InsertWithExpiry(_ ast.Value, _ iCache.InterQueryCacheValue, _ time.Time) int { + return 0 +} + func (c *onlyOnceInterQueryCache) Delete(_ ast.Value) {} func (c *onlyOnceInterQueryCache) UpdateConfig(_ *iCache.Config) {} @@ -3274,8 +3278,8 @@ func TestHTTPSendMetrics(t *testing.T) { t.Run("cache hits", func(t *testing.T) { // add an inter-query cache - config, _ := iCache.ParseCachingConfig(nil) - interQueryCache := iCache.NewInterQueryCache(config) + config, _ := iCache.ParseCachingConfig([]byte(`{"inter_query_builtin_cache": {"max_size_bytes": 500, "stale_entry_eviction_period_seconds": 1, "forced_eviction_threshold_percentage": 80},}`)) + interQueryCache := iCache.NewInterQueryCacheWithContext(context.Background(), config) // Execute query twice and verify http.send inter-query cache hit metric is incremented. m := metrics.New()