Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Otter is now available as a cache option #11

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 70 additions & 47 deletions benchmark_cache_test.go
Original file line number Diff line number Diff line change
@@ -1,160 +1,183 @@
package gubernator_test

import (
"strconv"
"math/rand"
"sync"
"testing"
"time"

"github.com/gubernator-io/gubernator/v2"
"github.com/mailgun/holster/v4/clock"
"github.com/stretchr/testify/require"
)

func BenchmarkCache(b *testing.B) {
testCases := []struct {
Name string
NewTestCache func() gubernator.Cache
NewTestCache func() (gubernator.Cache, error)
LockRequired bool
}{
{
Name: "LRUCache",
NewTestCache: func() gubernator.Cache {
return gubernator.NewLRUCache(0)
NewTestCache: func() (gubernator.Cache, error) {
return gubernator.NewLRUCache(0), nil
},
LockRequired: true,
},
{
Name: "OtterCache",
NewTestCache: func() (gubernator.Cache, error) {
return gubernator.NewOtterCache(0)
},
LockRequired: false,
},
}

for _, testCase := range testCases {
b.Run(testCase.Name, func(b *testing.B) {
b.Run("Sequential reads", func(b *testing.B) {
cache := testCase.NewTestCache()
cache, err := testCase.NewTestCache()
require.NoError(b, err)
expire := clock.Now().Add(time.Hour).UnixMilli()
keys := GenerateRandomKeys()

for i := 0; i < b.N; i++ {
key := strconv.Itoa(i)
for _, key := range keys {
item := &gubernator.CacheItem{
Key: key,
Value: i,
Value: "value:" + key,
ExpireAt: expire,
}
cache.Add(item)
}

mask := len(keys) - 1
b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
key := strconv.Itoa(i)
_, _ = cache.GetItem(key)
index := int(rand.Uint32() & uint32(mask))
_, _ = cache.GetItem(keys[index&mask])
}
})

b.Run("Sequential writes", func(b *testing.B) {
cache := testCase.NewTestCache()
cache, err := testCase.NewTestCache()
require.NoError(b, err)
expire := clock.Now().Add(time.Hour).UnixMilli()
keys := GenerateRandomKeys()

mask := len(keys) - 1
b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
index := int(rand.Uint32() & uint32(mask))
item := &gubernator.CacheItem{
Key: strconv.Itoa(i),
Value: i,
Key: keys[index&mask],
Value: "value:" + keys[index&mask],
ExpireAt: expire,
}
cache.Add(item)
}
})

b.Run("Concurrent reads", func(b *testing.B) {
cache := testCase.NewTestCache()
cache, err := testCase.NewTestCache()
require.NoError(b, err)
expire := clock.Now().Add(time.Hour).UnixMilli()
keys := GenerateRandomKeys()

for i := 0; i < b.N; i++ {
key := strconv.Itoa(i)
for _, key := range keys {
item := &gubernator.CacheItem{
Key: key,
Value: i,
Value: "value:" + key,
ExpireAt: expire,
}
cache.Add(item)
}

var wg sync.WaitGroup
var mutex sync.Mutex
var task func(i int)
var task func(key string)

if testCase.LockRequired {
task = func(i int) {
task = func(key string) {
mutex.Lock()
defer mutex.Unlock()
key := strconv.Itoa(i)
_, _ = cache.GetItem(key)
wg.Done()
}
} else {
task = func(i int) {
key := strconv.Itoa(i)
task = func(key string) {
_, _ = cache.GetItem(key)
wg.Done()
}
}

b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
wg.Add(1)
go task(i)
}
mask := len(keys) - 1

b.RunParallel(func(pb *testing.PB) {
index := int(rand.Uint32() & uint32(mask))
for pb.Next() {
task(keys[index&mask])
}
})

wg.Wait()
})

b.Run("Concurrent writes", func(b *testing.B) {
cache := testCase.NewTestCache()
cache, err := testCase.NewTestCache()
require.NoError(b, err)
expire := clock.Now().Add(time.Hour).UnixMilli()
keys := GenerateRandomKeys()

var wg sync.WaitGroup
var mutex sync.Mutex
var task func(i int)
var task func(key string)

if testCase.LockRequired {
task = func(i int) {
task = func(key string) {
mutex.Lock()
defer mutex.Unlock()
item := &gubernator.CacheItem{
Key: strconv.Itoa(i),
Value: i,
Key: key,
Value: "value:" + key,
ExpireAt: expire,
}
cache.Add(item)
wg.Done()
}
} else {
task = func(i int) {
task = func(key string) {
item := &gubernator.CacheItem{
Key: strconv.Itoa(i),
Value: i,
Key: key,
Value: "value:" + key,
ExpireAt: expire,
}
cache.Add(item)
wg.Done()
}
}

mask := len(keys) - 1
b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
wg.Add(1)
go task(i)
}

wg.Wait()
b.RunParallel(func(pb *testing.PB) {
index := int(rand.Uint32() & uint32(mask))
for pb.Next() {
task(keys[index&mask])
}
})
})

})
}
}

const cacheSize = 32768

func GenerateRandomKeys() []string {
keys := make([]string, 0, cacheSize)
for i := 0; i < cacheSize; i++ {
keys = append(keys, gubernator.RandomString(20))
}
return keys
}
153 changes: 153 additions & 0 deletions cache_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
Copyright 2024 Derrick J. Wippler

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package gubernator

import (
"context"
"sync"

"github.com/pkg/errors"
)

type CacheManager interface {
GetRateLimit(context.Context, *RateLimitReq, RateLimitReqState) (*RateLimitResp, error)
GetCacheItem(context.Context, string) (*CacheItem, bool, error)
AddCacheItem(context.Context, string, *CacheItem) error
Store(ctx context.Context) error
Load(context.Context) error
Close() error
}

type cacheManager struct {
conf Config
cache Cache
}

// NewCacheManager creates a new instance of the CacheManager interface using
// the cache returned by Config.CacheFactory
func NewCacheManager(conf Config) (CacheManager, error) {

cache, err := conf.CacheFactory(conf.CacheSize)
if err != nil {
return nil, err
}
return &cacheManager{
cache: cache,
conf: conf,
}, nil
}

// GetRateLimit fetches the item from the cache if it exists, and preforms the appropriate rate limit calculation
func (m *cacheManager) GetRateLimit(ctx context.Context, req *RateLimitReq, state RateLimitReqState) (*RateLimitResp, error) {
var rlResponse *RateLimitResp
var err error

switch req.Algorithm {
case Algorithm_TOKEN_BUCKET:
rlResponse, err = tokenBucket(ctx, m.conf.Store, m.cache, req, state)
if err != nil {
msg := "Error in tokenBucket"
countError(err, msg)
}

case Algorithm_LEAKY_BUCKET:
rlResponse, err = leakyBucket(ctx, m.conf.Store, m.cache, req, state)
if err != nil {
msg := "Error in leakyBucket"
countError(err, msg)
}

default:
err = errors.Errorf("Invalid rate limit algorithm '%d'", req.Algorithm)
}

return rlResponse, err
}

// Store saves every cache item into persistent storage provided via Config.Loader
func (m *cacheManager) Store(ctx context.Context) error {
out := make(chan *CacheItem, 500)
var wg sync.WaitGroup

wg.Add(1)
go func() {
defer wg.Done()
for item := range m.cache.Each() {
select {
case out <- item:

case <-ctx.Done():
return
}
}
}()

go func() {
wg.Wait()
close(out)
}()

if ctx.Err() != nil {
return ctx.Err()
}

if err := m.conf.Loader.Save(out); err != nil {
return errors.Wrap(err, "while calling p.conf.Loader.Save()")
}
return nil
}

// Close closes the cache manager
func (m *cacheManager) Close() error {
return m.cache.Close()
}

// Load cache items from persistent storage provided via Config.Loader
func (m *cacheManager) Load(ctx context.Context) error {
ch, err := m.conf.Loader.Load()
if err != nil {
return errors.Wrap(err, "Error in loader.Load")
}

for {
var item *CacheItem
var ok bool

select {
case item, ok = <-ch:
if !ok {
return nil
}
case <-ctx.Done():
return ctx.Err()
}
_ = m.cache.Add(item)
}
}

// GetCacheItem returns an item from the cache
func (m *cacheManager) GetCacheItem(_ context.Context, key string) (*CacheItem, bool, error) {
item, ok := m.cache.GetItem(key)
return item, ok, nil
}

// AddCacheItem adds an item to the cache. The CacheItem.Key should be set correctly, else the item
// will not be added to the cache correctly.
func (m *cacheManager) AddCacheItem(_ context.Context, _ string, item *CacheItem) error {
_ = m.cache.Add(item)
return nil
}
Loading
Loading