Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TT-13939] Embed memorycache, drop leakybucket import #6843

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions gateway/session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ import (
"github.com/sirupsen/logrus"

"github.com/TykTechnologies/drl"
"github.com/TykTechnologies/leakybucket"
"github.com/TykTechnologies/leakybucket/memorycache"

"github.com/TykTechnologies/tyk/config"
"github.com/TykTechnologies/tyk/internal/httputil"
"github.com/TykTechnologies/tyk/internal/memorycache"
"github.com/TykTechnologies/tyk/internal/model"
"github.com/TykTechnologies/tyk/internal/rate"
"github.com/TykTechnologies/tyk/internal/rate/limiter"
"github.com/TykTechnologies/tyk/internal/redis"
Expand Down Expand Up @@ -53,14 +53,11 @@ type SessionLimiter struct {
ctx context.Context
drlManager *drl.DRL
config *config.Config
bucketStore leakybucket.Storage
bucketStore model.BucketStorage
limiterStorage redis.UniversalClient
smoothing *rate.Smoothing
}

// Encourage reuse in NewSessionLimiter.
var sessionLimiterBucketStore = memorycache.New()

// NewSessionLimiter initializes the session limiter.
//
// The session limiter initializes the storage required for rate limiters.
Expand All @@ -73,7 +70,7 @@ func NewSessionLimiter(ctx context.Context, conf *config.Config, drlManager *drl
ctx: ctx,
drlManager: drlManager,
config: conf,
bucketStore: sessionLimiterBucketStore,
bucketStore: memorycache.New(ctx),
}

log.Infof("[RATELIMIT] %s", conf.RateLimit.String())
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ require (
github.com/TykTechnologies/goverify v0.0.0-20220808203004-1486f89e7708
github.com/TykTechnologies/graphql-go-tools v1.6.2-0.20241212110213-7724a3b64bb2
github.com/TykTechnologies/graphql-translator v0.0.0-20240319092712-4ba87e4c06ff
github.com/TykTechnologies/leakybucket v0.0.0-20170301023702-71692c943e3c
github.com/TykTechnologies/murmur3 v0.0.0-20230310161213-aad17efd5632
github.com/TykTechnologies/openid2go v0.1.2
github.com/TykTechnologies/storage v1.2.2
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ github.com/TykTechnologies/graphql-translator v0.0.0-20240319092712-4ba87e4c06ff
github.com/TykTechnologies/graphql-translator v0.0.0-20240319092712-4ba87e4c06ff/go.mod h1:K3KhGG9CmvXv1lJhKZpnLb1tC8N1oIzXTunQsc6N9og=
github.com/TykTechnologies/kin-openapi v0.90.0 h1:kHw0mtANwIpmlU6eCeeCgRMa52EiPPhEZPuHc3lKawo=
github.com/TykTechnologies/kin-openapi v0.90.0/go.mod h1:pkzuiceujHvAuDu3bTD/AD5OacuP4eMfrz9QhJlMvdQ=
github.com/TykTechnologies/leakybucket v0.0.0-20170301023702-71692c943e3c h1:j6fd0Fz1R4oSWOmcooGjrdahqrML+btQ+PfEJw8SzbA=
github.com/TykTechnologies/leakybucket v0.0.0-20170301023702-71692c943e3c/go.mod h1:GnHUbsQx+ysI10osPhUdTmsxcE7ef64cVp38Fdyd7e0=
github.com/TykTechnologies/murmur3 v0.0.0-20230310161213-aad17efd5632 h1:T5NWziFusj8au5nxAqMMh/bZyX9CAyYnBkaMSsfH6BA=
github.com/TykTechnologies/murmur3 v0.0.0-20230310161213-aad17efd5632/go.mod h1:UsPYgOFBpNzDXLEti7MKOwHLpVSqdzuNGkVFPspQmnQ=
github.com/TykTechnologies/openid2go v0.1.2 h1:WXctksOahA/epTVVvbn9iNUuMXKRr0ksrF4dY9KW8o8=
Expand Down
45 changes: 45 additions & 0 deletions internal/memorycache/bucket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package memorycache

import (
"sync"
"time"

"github.com/TykTechnologies/tyk/internal/model"
)

type Bucket struct {
capacity uint
remaining uint
reset time.Time
rate time.Duration
mutex sync.Mutex
}

func (b *Bucket) Capacity() uint {
return b.capacity
}

// Remaining space in the bucket.
func (b *Bucket) Remaining() uint {
return b.remaining
}

// Reset returns when the bucket will be drained.
func (b *Bucket) Reset() time.Time {
return b.reset
}

// Add to the bucket.
func (b *Bucket) Add(amount uint) (model.BucketState, error) {
b.mutex.Lock()
defer b.mutex.Unlock()
if time.Now().After(b.reset) {
b.reset = time.Now().Add(b.rate)
b.remaining = b.capacity
}
if amount > b.remaining {
return model.BucketState{Capacity: b.capacity, Remaining: b.remaining, Reset: b.reset}, model.ErrBucketFull
}
b.remaining -= amount
return model.BucketState{Capacity: b.capacity, Remaining: b.remaining, Reset: b.reset}, nil
}
99 changes: 99 additions & 0 deletions internal/memorycache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package memorycache

import (
"context"
"sync"
"time"
)

// Cache is a synchronised map of items that auto-expire once stale
type Cache struct {
mutex sync.RWMutex
ttl time.Duration
items map[string]*Item
}

// NewCache is a helper to create instance of the Cache struct.
// The ctx is used to cancel the TTL map cleanup goroutine.
func NewCache(ctx context.Context, duration time.Duration) *Cache {
cache := &Cache{
ttl: duration,
items: map[string]*Item{},
}
go cache.startCleanupTimer(ctx)
return cache
}

// Set is a thread-safe way to add new items to the map
func (cache *Cache) Set(key string, data *Bucket) {
cache.mutex.Lock()
item := &Item{data: data}
item.touch(cache.ttl)
cache.items[key] = item
cache.mutex.Unlock()
}

// Get is a thread-safe way to lookup items
// Every lookup, also touches the item, hence extending it's life
func (cache *Cache) Get(key string) (data *Bucket, found bool) {
cache.mutex.Lock()
item, exists := cache.items[key]
if !exists || item.expired() {
data = &Bucket{}
found = false
} else {
item.touch(cache.ttl)
data = item.data
found = true
}
cache.mutex.Unlock()
return
}

// Count returns the number of items in the cache
// (helpful for tracking memory leaks)
func (cache *Cache) Count() int {
cache.mutex.RLock()
count := len(cache.items)
cache.mutex.RUnlock()
return count
}

func (cache *Cache) cleanup() {
cache.mutex.Lock()
for key, item := range cache.items {
if item.expired() {
delete(cache.items, key)
}
}
cache.mutex.Unlock()
}

func (cache *Cache) clear() {
cache.mutex.Lock()
cache.items = map[string]*Item{}
cache.mutex.Unlock()
}

func (cache *Cache) startCleanupTimer(ctx context.Context) {
interval := cache.ttl
if interval < time.Second {
interval = time.Second
}

ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
//fmt.Println("Shutting down cleanup timer:", ctx.Err())
goto done
case <-ticker.C:
cache.cleanup()
}
break
}
done:
cache.clear()
}
29 changes: 29 additions & 0 deletions internal/memorycache/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package memorycache

import (
"context"
"runtime"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestCache_Shutdown(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

g1 := runtime.NumGoroutine()

cache := NewCache(ctx, time.Minute)
cache.Set("default", nil)
assert.Equal(t, 1, cache.Count(), "added an item here")

g2 := runtime.NumGoroutine()

cancel()
runtime.GC()
time.Sleep(10 * time.Millisecond)

assert.True(t, g1+1 == g2, "goroutine should increase by one, got %d => %d", g1, g2)
assert.Equal(t, 0, cache.Count(), "we cleared the cache on shutdown")
}
32 changes: 32 additions & 0 deletions internal/memorycache/item.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package memorycache

import (
"sync"
"time"
)

// Item represents a record in the cache map.
type Item struct {
sync.RWMutex
data *Bucket
expires *time.Time
}

func (item *Item) touch(duration time.Duration) {
item.Lock()
expiration := time.Now().Add(duration)
item.expires = &expiration
item.Unlock()
}

func (item *Item) expired() bool {
var value bool
item.RLock()
if item.expires == nil {
value = true
} else {
value = item.expires.Before(time.Now())
}
item.RUnlock()
return value
}
37 changes: 37 additions & 0 deletions internal/memorycache/storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package memorycache

import (
"context"
"time"

"github.com/TykTechnologies/tyk/internal/model"
)

// BucketStorage is a non thread-safe in-memory leaky bucket factory.
type BucketStorage struct {
buckets *Cache
}

// New initializes the in-memory bucket store.
func New(ctx context.Context) *BucketStorage {
return &BucketStorage{
buckets: NewCache(ctx, 10*time.Minute),
}
}

// Create a bucket.
func (s *BucketStorage) Create(name string, capacity uint, rate time.Duration) (model.Bucket, error) {
b, ok := s.buckets.Get(name)
if ok {
return b, nil
}

b = &Bucket{
capacity: capacity,
remaining: capacity,
reset: time.Now().Add(rate),
rate: rate,
}
s.buckets.Set(name, b)
return b, nil
}
40 changes: 40 additions & 0 deletions internal/model/bucket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package model

import (
"errors"
"time"
)

var (
// ErrBucketFull is returned when the amount requested to add exceeds the remaining space in the bucket.
ErrBucketFull = errors.New("add exceeds free bucket capacity")
)

// Bucket interface for interacting with leaky buckets: https://en.wikipedia.org/wiki/Leaky_bucket
type Bucket interface {
// Capacity of the bucket.
Capacity() uint

// Remaining space in the bucket.
Remaining() uint

// Reset returns when the bucket will be drained.
Reset() time.Time

// Add to the bucket. Returns bucket state after adding.
Add(uint) (BucketState, error)
}

// BucketState is a snapshot of a bucket's properties.
type BucketState struct {
Capacity uint
Remaining uint
Reset time.Time
}

// BucketStorage interface for generating buckets keyed by a string.
type BucketStorage interface {
// Create a bucket with a name, capacity, and rate.
// rate is how long it takes for full capacity to drain.
Create(name string, capacity uint, rate time.Duration) (Bucket, error)
}
2 changes: 2 additions & 0 deletions internal/model/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// Package model provides an internal data model for use across the gateway.
package model
Loading