Skip to content

Commit

Permalink
Clean expired cache entries periodically
Browse files Browse the repository at this point in the history
Regularly clean up of cache entries that have expired for a more efficient use of memory.
Introduce two new parameters to tune clean up frequency and threshold for forced FIFO eviction.

Fixes open-policy-agent#5320

Signed-off-by: Rudrakh Panigrahi <[email protected]>
  • Loading branch information
rudrakhp committed Jan 9, 2024
1 parent ea1ecdf commit 264ff95
Show file tree
Hide file tree
Showing 7 changed files with 349 additions and 42 deletions.
2 changes: 2 additions & 0 deletions runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,8 @@ func (rt *Runtime) Serve(ctx context.Context) error {
})
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()
rt.server, err = rt.server.Init(ctx)
if err != nil {
rt.logger.WithFields(map[string]interface{}{"err": err}).Error("Unable to initialize server.")
Expand Down
2 changes: 1 addition & 1 deletion sdk/opa.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (opa *OPA) configure(ctx context.Context, bs []byte, ready chan struct{}, b

opa.state.manager = manager
opa.state.queryCache.Clear()
opa.state.interQueryBuiltinCache = cache.NewInterQueryCache(manager.InterQueryBuiltinCacheConfig())
opa.state.interQueryBuiltinCache = cache.NewInterQueryCacheWithContext(ctx, manager.InterQueryBuiltinCacheConfig())
opa.config = bs

return nil
Expand Down
6 changes: 3 additions & 3 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func New() *Server {
// Init initializes the server. This function MUST be called before starting any loops
// from s.Listeners().
func (s *Server) Init(ctx context.Context) (*Server, error) {
s.initRouters()
s.initRouters(ctx)

txn, err := s.store.NewTransaction(ctx, storage.WriteParams)
if err != nil {
Expand Down Expand Up @@ -755,7 +755,7 @@ func (s *Server) initHandlerCompression(handler http.Handler) (http.Handler, err
return compressHandler, nil
}

func (s *Server) initRouters() {
func (s *Server) initRouters(ctx context.Context) {
mainRouter := s.router
if mainRouter == nil {
mainRouter = mux.NewRouter()
Expand All @@ -764,7 +764,7 @@ func (s *Server) initRouters() {
diagRouter := mux.NewRouter()

// authorizer, if configured, needs the iCache to be set up already
s.interQueryBuiltinCache = iCache.NewInterQueryCache(s.manager.InterQueryBuiltinCacheConfig())
s.interQueryBuiltinCache = iCache.NewInterQueryCacheWithContext(ctx, s.manager.InterQueryBuiltinCacheConfig())
s.manager.RegisterCacheTrigger(s.updateCacheConfig)

// Add authorization handler. This must come BEFORE authentication handler
Expand Down
140 changes: 125 additions & 15 deletions topdown/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,20 @@ package cache

import (
"container/list"
"context"
"fmt"
"math"
"sync"
"time"

"github.com/open-policy-agent/opa/ast"

"github.com/open-policy-agent/opa/util"
)

const (
defaultMaxSizeBytes = int64(0) // unlimited
defaultMaxSizeBytes = int64(0) // unlimited
defaultForcedEvictionThresholdPercentage = int64(100) // trigger at max_size_bytes
defaultStaleEntryEvictionPeriodSeconds = int64(0) // never
)

// Config represents the configuration of the inter-query cache.
Expand All @@ -24,16 +29,25 @@ type Config struct {
}

// InterQueryBuiltinCacheConfig represents the configuration of the inter-query cache that built-in functions can utilize.
// MaxSizeBytes - max capacity of cache in bytes
// ForcedEvictionThresholdPercentage - capacity usage in percentage after which forced FIFO eviction starts
// StaleEntryEvictionPeriodSeconds - time period between end of previous and start of new stale entry eviction routine
type InterQueryBuiltinCacheConfig struct {
MaxSizeBytes *int64 `json:"max_size_bytes,omitempty"`
MaxSizeBytes *int64 `json:"max_size_bytes,omitempty"`
ForcedEvictionThresholdPercentage *int64 `json:"forced_eviction_threshold_percentage,omitempty"`
StaleEntryEvictionPeriodSeconds *int64 `json:"stale_entry_eviction_period_seconds,omitempty"`
}

// ParseCachingConfig returns the config for the inter-query cache.
func ParseCachingConfig(raw []byte) (*Config, error) {
if raw == nil {
maxSize := new(int64)
*maxSize = defaultMaxSizeBytes
return &Config{InterQueryBuiltinCache: InterQueryBuiltinCacheConfig{MaxSizeBytes: maxSize}}, nil
threshold := new(int64)
*threshold = defaultForcedEvictionThresholdPercentage
period := new(int64)
*period = defaultStaleEntryEvictionPeriodSeconds
return &Config{InterQueryBuiltinCache: InterQueryBuiltinCacheConfig{MaxSizeBytes: maxSize, ForcedEvictionThresholdPercentage: threshold, StaleEntryEvictionPeriodSeconds: period}}, nil
}

var config Config
Expand All @@ -55,6 +69,26 @@ func (c *Config) validateAndInjectDefaults() error {
*maxSize = defaultMaxSizeBytes
c.InterQueryBuiltinCache.MaxSizeBytes = maxSize
}
if c.InterQueryBuiltinCache.ForcedEvictionThresholdPercentage == nil {
threshold := new(int64)
*threshold = defaultForcedEvictionThresholdPercentage
c.InterQueryBuiltinCache.ForcedEvictionThresholdPercentage = threshold
} else {
threshold := *c.InterQueryBuiltinCache.ForcedEvictionThresholdPercentage
if threshold < 0 || threshold > 100 {
return fmt.Errorf("invalid forced_eviction_threshold_percentage %v", threshold)
}
}
if c.InterQueryBuiltinCache.StaleEntryEvictionPeriodSeconds == nil {
period := new(int64)
*period = defaultStaleEntryEvictionPeriodSeconds
c.InterQueryBuiltinCache.StaleEntryEvictionPeriodSeconds = period
} else {
period := *c.InterQueryBuiltinCache.StaleEntryEvictionPeriodSeconds
if period < 0 {
return fmt.Errorf("invalid stale_entry_eviction_period_seconds %v", period)
}
}
return nil
}

Expand All @@ -68,23 +102,55 @@ type InterQueryCacheValue interface {
type InterQueryCache interface {
Get(key ast.Value) (value InterQueryCacheValue, found bool)
Insert(key ast.Value, value InterQueryCacheValue) int
InsertWithExpiry(key ast.Value, value InterQueryCacheValue, expiresAt time.Time) int
Delete(key ast.Value)
UpdateConfig(config *Config)
Clone(value InterQueryCacheValue) (InterQueryCacheValue, error)
}

// NewInterQueryCache returns a new inter-query cache.
// The cache uses a FIFO eviction policy when it reaches the forced eviction threshold.
// Parameters:
//
// config - to configure the InterQueryCache
func NewInterQueryCache(config *Config) InterQueryCache {
return &cache{
items: map[string]cacheItem{},
usage: 0,
config: config,
l: list.New(),
return newCache(config)
}

// NewInterQueryCacheWithContext returns a new inter-query cache with context.
// The cache uses a combination of FIFO eviction policy when it reaches the forced eviction threshold
// and a periodic cleanup routine to remove stale entries that exceed their expiration time, if specified.
// If configured with a zero stale_entry_eviction_period_seconds value, the stale entry cleanup routine is disabled.
//
// Parameters:
//
// ctx - used to control lifecycle of the stale entry cleanup routine
// config - to configure the InterQueryCache
func NewInterQueryCacheWithContext(ctx context.Context, config *Config) InterQueryCache {
iqCache := newCache(config)
if iqCache.staleEntryEvictionTimePeriodSeconds() > 0 {
cleanupTicker := time.NewTicker(time.Duration(iqCache.staleEntryEvictionTimePeriodSeconds()) * time.Second)
go func() {
for {
select {
case <-cleanupTicker.C:
cleanupTicker.Stop()
iqCache.cleanStaleValues()
cleanupTicker = time.NewTicker(time.Duration(iqCache.staleEntryEvictionTimePeriodSeconds()) * time.Second)
case <-ctx.Done():
cleanupTicker.Stop()
return
}
}
}()
}

return iqCache
}

type cacheItem struct {
value InterQueryCacheValue
expiresAt time.Time
keyElement *list.Element
}

Expand All @@ -96,11 +162,26 @@ type cache struct {
mtx sync.Mutex
}

// Insert inserts a key k into the cache with value v.
func (c *cache) Insert(k ast.Value, v InterQueryCacheValue) (dropped int) {
func newCache(config *Config) *cache {
return &cache{
items: map[string]cacheItem{},
usage: 0,
config: config,
l: list.New(),
}
}

// InsertWithExpiry inserts a key k into the cache with value v with an expiration time expiresAt.
// A zero time value for expiresAt indicates no expiry
func (c *cache) InsertWithExpiry(k ast.Value, v InterQueryCacheValue, expiresAt time.Time) (dropped int) {
c.mtx.Lock()
defer c.mtx.Unlock()
return c.unsafeInsert(k, v)
return c.unsafeInsert(k, v, expiresAt)
}

// Insert inserts a key k into the cache with value v with no expiration time.
func (c *cache) Insert(k ast.Value, v InterQueryCacheValue) (dropped int) {
return c.InsertWithExpiry(k, v, time.Time{})
}

// Get returns the value in the cache for k.
Expand Down Expand Up @@ -137,10 +218,9 @@ func (c *cache) Clone(value InterQueryCacheValue) (InterQueryCacheValue, error)
return c.unsafeClone(value)
}

func (c *cache) unsafeInsert(k ast.Value, v InterQueryCacheValue) (dropped int) {
func (c *cache) unsafeInsert(k ast.Value, v InterQueryCacheValue, expiresAt time.Time) (dropped int) {
size := v.SizeInBytes()
limit := c.maxSizeBytes()

limit := int64(math.Ceil(float64(c.forcedEvictionThresholdPercentage())/100.0) * (float64(c.maxSizeBytes())))
if limit > 0 {
if size > limit {
dropped++
Expand All @@ -159,6 +239,7 @@ func (c *cache) unsafeInsert(k ast.Value, v InterQueryCacheValue) (dropped int)

c.items[k.String()] = cacheItem{
value: v,
expiresAt: expiresAt,
keyElement: c.l.PushBack(k),
}
c.usage += size
Expand Down Expand Up @@ -191,3 +272,32 @@ func (c *cache) maxSizeBytes() int64 {
}
return *c.config.InterQueryBuiltinCache.MaxSizeBytes
}

func (c *cache) forcedEvictionThresholdPercentage() int64 {
if c.config == nil {
return defaultForcedEvictionThresholdPercentage
}
return *c.config.InterQueryBuiltinCache.ForcedEvictionThresholdPercentage
}

func (c *cache) staleEntryEvictionTimePeriodSeconds() int64 {
if c.config == nil {
return defaultStaleEntryEvictionPeriodSeconds
}
return *c.config.InterQueryBuiltinCache.StaleEntryEvictionPeriodSeconds
}

func (c *cache) cleanStaleValues() (dropped int) {
c.mtx.Lock()
defer c.mtx.Unlock()
for key := c.l.Front(); key != nil; {
nextKey := key.Next()
// if expiresAt is zero, the item doesn't have an expiry
if ea := c.items[(key.Value.(ast.Value)).String()].expiresAt; !ea.IsZero() && ea.Before(time.Now()) {
c.unsafeDelete(key.Value.(ast.Value))
dropped++
}
key = nextKey
}
return dropped
}
Loading

0 comments on commit 264ff95

Please sign in to comment.