Skip to content

Commit

Permalink
Added LRUMutexCache restored LRUCache
Browse files Browse the repository at this point in the history
  • Loading branch information
thrawn01 committed May 15, 2024
1 parent 6dff201 commit cca0df8
Show file tree
Hide file tree
Showing 14 changed files with 985 additions and 138 deletions.
8 changes: 4 additions & 4 deletions algorithms.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func tokenBucket(ctx rateContext) (resp *RateLimitResp, err error) {
// If not in the cache, check the store if provided
if ctx.Store != nil && !ok {
if ctx.CacheItem, ok = ctx.Store.Get(ctx, ctx.Request); ok {
if !ctx.Cache.Add(ctx.CacheItem) {
if !ctx.Cache.AddIfNotPresent(ctx.CacheItem) {
// Someone else added a new token bucket item to the cache for this
// rate limit before we did, so we retry by calling ourselves recursively.
return tokenBucket(ctx)
Expand Down Expand Up @@ -270,7 +270,7 @@ func initTokenBucketItem(ctx rateContext) (resp *RateLimitResp, err error) {
Value: &t,
ExpireAt: expire,
}
if !ctx.Cache.Add(ctx.CacheItem) {
if !ctx.Cache.AddIfNotPresent(ctx.CacheItem) {
return rl, errAlreadyExistsInCache
}
}
Expand Down Expand Up @@ -299,7 +299,7 @@ func leakyBucket(ctx rateContext) (resp *RateLimitResp, err error) {
if ctx.Store != nil && !ok {
// Cache missed, check our store for the item.
if ctx.CacheItem, ok = ctx.Store.Get(ctx, ctx.Request); ok {
if !ctx.Cache.Add(ctx.CacheItem) {
if !ctx.Cache.AddIfNotPresent(ctx.CacheItem) {
// Someone else added a new leaky bucket item to the cache for this
// rate limit before we did, so we retry by calling ourselves recursively.
return leakyBucket(ctx)
Expand Down Expand Up @@ -519,7 +519,7 @@ func initLeakyBucketItem(ctx rateContext) (resp *RateLimitResp, err error) {
Key: ctx.Request.HashKey(),
Value: &b,
}
if !ctx.Cache.Add(ctx.CacheItem) {
if !ctx.Cache.AddIfNotPresent(ctx.CacheItem) {
return nil, errAlreadyExistsInCache
}
}
Expand Down
10 changes: 5 additions & 5 deletions benchmark_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func BenchmarkCache(b *testing.B) {
Value: "value:" + key,
ExpireAt: expire,
}
cache.Add(item)
cache.AddIfNotPresent(item)
}

mask := len(keys) - 1
Expand Down Expand Up @@ -78,7 +78,7 @@ func BenchmarkCache(b *testing.B) {
Value: "value:" + keys[index&mask],
ExpireAt: expire,
}
cache.Add(item)
cache.AddIfNotPresent(item)
}
})

Expand All @@ -94,7 +94,7 @@ func BenchmarkCache(b *testing.B) {
Value: "value:" + key,
ExpireAt: expire,
}
cache.Add(item)
cache.AddIfNotPresent(item)
}

var mutex sync.Mutex
Expand Down Expand Up @@ -144,7 +144,7 @@ func BenchmarkCache(b *testing.B) {
Value: "value:" + key,
ExpireAt: expire,
}
cache.Add(item)
cache.AddIfNotPresent(item)
}
} else {
task = func(key string) {
Expand All @@ -153,7 +153,7 @@ func BenchmarkCache(b *testing.B) {
Value: "value:" + key,
ExpireAt: expire,
}
cache.Add(item)
cache.AddIfNotPresent(item)
}
}

Expand Down
105 changes: 104 additions & 1 deletion cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,25 @@ limitations under the License.

package gubernator

import "sync"
import (
"sync"

"github.com/prometheus/client_golang/prometheus"

"github.com/mailgun/holster/v4/clock"
)

type Cache interface {
// Add adds an item, or replaces an item in the cache
//
// Deprecated: Gubernator algorithms now use AddIfNotExists.
// TODO: Remove this method in v3
Add(item *CacheItem) bool

// AddIfNotPresent adds the item to the cache if it doesn't already exist.
// Returns true if the item was added, false if the item already exists.
AddIfNotPresent(item *CacheItem) bool

GetItem(key string) (value *CacheItem, ok bool)
Each() chan *CacheItem
Remove(key string)
Expand Down Expand Up @@ -66,3 +81,91 @@ func (item *CacheItem) IsExpired() bool {

return false
}

func (item *CacheItem) Copy(from *CacheItem) {
item.mutex.Lock()
defer item.mutex.Unlock()

item.InvalidAt = from.InvalidAt
item.Algorithm = from.Algorithm
item.ExpireAt = from.ExpireAt
item.Value = from.Value
item.Key = from.Key
}

// MillisecondNow returns unix epoch in milliseconds
func MillisecondNow() int64 {
return clock.Now().UnixNano() / 1000000
}

type CacheStats struct {
Size int64
Hit int64
Miss int64
UnexpiredEvictions int64
}

// CacheCollector provides prometheus metrics collector for Cache implementations
// Register only one collector, add one or more caches to this collector.
type CacheCollector struct {
caches []Cache
metricSize prometheus.Gauge
metricAccess *prometheus.CounterVec
metricUnexpiredEvictions prometheus.Counter
}

func NewCacheCollector() *CacheCollector {
return &CacheCollector{
caches: []Cache{},
metricSize: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "gubernator_cache_size",
Help: "The number of items in LRU Cache which holds the rate limits.",
}),
metricAccess: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "gubernator_cache_access_count",
Help: "Cache access counts. Label \"type\" = hit|miss.",
}, []string{"type"}),
metricUnexpiredEvictions: prometheus.NewCounter(prometheus.CounterOpts{
Name: "gubernator_unexpired_evictions_count",
Help: "Count the number of cache items which were evicted while unexpired.",
}),
}
}

var _ prometheus.Collector = &CacheCollector{}

// AddCache adds a Cache object to be tracked by the collector.
func (c *CacheCollector) AddCache(cache Cache) {
c.caches = append(c.caches, cache)
}

// Describe fetches prometheus metrics to be registered
func (c *CacheCollector) Describe(ch chan<- *prometheus.Desc) {
c.metricSize.Describe(ch)
c.metricAccess.Describe(ch)
c.metricUnexpiredEvictions.Describe(ch)
}

// Collect fetches metric counts and gauges from the cache
func (c *CacheCollector) Collect(ch chan<- prometheus.Metric) {
stats := c.getStats()
c.metricSize.Set(float64(stats.Size))
c.metricSize.Collect(ch)
c.metricAccess.WithLabelValues("miss").Add(float64(stats.Miss))
c.metricAccess.WithLabelValues("hit").Add(float64(stats.Hit))
c.metricAccess.Collect(ch)
c.metricUnexpiredEvictions.Add(float64(stats.UnexpiredEvictions))
c.metricUnexpiredEvictions.Collect(ch)
}

func (c *CacheCollector) getStats() CacheStats {
var total CacheStats
for _, cache := range c.caches {
stats := cache.Stats()
total.Hit += stats.Hit
total.Miss += stats.Miss
total.Size += stats.Size
total.UnexpiredEvictions += stats.UnexpiredEvictions
}
return total
}
11 changes: 9 additions & 2 deletions cache_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,14 @@ func (m *cacheManager) Load(ctx context.Context) error {
case <-ctx.Done():
return ctx.Err()
}
_ = m.cache.Add(item)
retry:
if !m.cache.AddIfNotPresent(item) {
cItem, ok := m.cache.GetItem(item.Key)
if !ok {
goto retry
}
cItem.Copy(item)
}
}
}

Expand All @@ -160,6 +167,6 @@ func (m *cacheManager) GetCacheItem(_ context.Context, key string) (*CacheItem,
// AddCacheItem adds an item to the cache. The CacheItem.Key should be set correctly, else the item
// will not be added to the cache correctly.
func (m *cacheManager) AddCacheItem(_ context.Context, _ string, item *CacheItem) error {
_ = m.cache.Add(item)
_ = m.cache.AddIfNotPresent(item)
return nil
}
2 changes: 1 addition & 1 deletion cache_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestCacheManager(t *testing.T) {

// Mock Cache.
for _, item := range cacheItems {
mockCache.On("Add", item).Once().Return(false)
mockCache.On("AddIfNotPresent", item).Once().Return(true)
}

// Call code.
Expand Down
1 change: 1 addition & 0 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ func StartWith(localPeers []gubernator.PeerInfo) error {
GRPCListenAddress: peer.GRPCAddress,
HTTPListenAddress: peer.HTTPAddress,
DataCenter: peer.DataCenter,
CacheProvider: "otter",
Behaviors: gubernator.BehaviorConfig{
// Suitable for testing but not production
GlobalSyncWait: clock.Millisecond * 50,
Expand Down
6 changes: 6 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,9 @@ type DaemonConfig struct {
// (Optional) TraceLevel sets the tracing level, this controls the number of spans included in a single trace.
// Valid options are (tracing.InfoLevel, tracing.DebugLevel) Defaults to tracing.InfoLevel
TraceLevel tracing.Level

// (Optional) CacheProvider specifies which cache implementation to store rate limits in
CacheProvider string
}

func (d *DaemonConfig) ClientTLS() *tls.Config {
Expand Down Expand Up @@ -420,7 +423,10 @@ func SetupDaemonConfig(logger *logrus.Logger, configFile io.Reader) (DaemonConfi
setter.SetDefault(&conf.DNSPoolConf.ResolvConf, os.Getenv("GUBER_RESOLV_CONF"), "/etc/resolv.conf")
setter.SetDefault(&conf.DNSPoolConf.OwnAddress, conf.AdvertiseAddress)

setter.SetDefault(&conf.CacheProvider, os.Getenv("GUBER_CACHE_PROVIDER"), "default-lru")

// PeerPicker Config
// TODO: Deprecated: Remove in GUBER_PEER_PICKER in v3
if pp := os.Getenv("GUBER_PEER_PICKER"); pp != "" {
var replicas int
var hash string
Expand Down
23 changes: 16 additions & 7 deletions daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,23 @@ func (s *Daemon) Start(ctx context.Context) error {
}

cacheFactory := func(maxSize int) (Cache, error) {
//cache := NewLRUCache(maxSize)
// TODO: Enable Otter as default or provide a config option
cache, err := NewOtterCache(maxSize)
if err != nil {
return nil, err
// TODO(thrawn01): Make Otter the default in gubernator V3
switch s.conf.CacheProvider {
case "otter":
cache, err := NewOtterCache(maxSize)
if err != nil {
return nil, err
}
cacheCollector.AddCache(cache)
return cache, nil
case "default-lru", "":
cache := NewLRUMutexCache(maxSize)
cacheCollector.AddCache(cache)
return cache, nil
default:
return nil, errors.Errorf("'GUBER_CACHE_PROVIDER=%s' is invalid; "+
"choices are ['otter', 'default-lru']", s.conf.CacheProvider)
}
cacheCollector.AddCache(cache)
return cache, nil
}

// Handler to collect duration and API access metrics for GRPC
Expand Down
Loading

0 comments on commit cca0df8

Please sign in to comment.