Skip to content

Commit

Permalink
ratelimit: per descriptor hits addend support and prefer uint64 (#802)
Browse files Browse the repository at this point in the history
* ratelimit: per descriptor hits addend support and prefer uint64

Signed-off-by: wangbaiping/wbpcode <[email protected]>

* fix some tests

Signed-off-by: wangbaiping/wbpcode <[email protected]>

* refactor and fix

Signed-off-by: wangbaiping/wbpcode <[email protected]>

* more different tests

Signed-off-by: wangbaiping/wbpcode <[email protected]>

* remove max

Signed-off-by: wangbaiping/wbpcode <[email protected]>

---------

Signed-off-by: wangbaiping/wbpcode <[email protected]>
  • Loading branch information
wbpcode authored Dec 27, 2024
1 parent 7475d46 commit 38500fe
Show file tree
Hide file tree
Showing 11 changed files with 235 additions and 194 deletions.
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
module github.com/envoyproxy/ratelimit

go 1.21.11
go 1.22.8

require (
github.com/DataDog/datadog-go/v5 v5.5.0
github.com/alicebob/miniredis/v2 v2.33.0
github.com/bradfitz/gomemcache v0.0.0-20230905024940-24af94b03874
github.com/coocood/freecache v1.2.4
github.com/envoyproxy/go-control-plane v0.12.1-0.20240123181358-841e293a220b
github.com/envoyproxy/go-control-plane v0.13.2-0.20241219025321-f011ad88ec17
github.com/go-kit/log v0.2.1
github.com/golang/mock v1.6.0
github.com/google/uuid v1.6.0
Expand All @@ -23,7 +23,7 @@ require (
github.com/prometheus/client_model v0.6.0
github.com/prometheus/statsd_exporter v0.26.1
github.com/sirupsen/logrus v1.9.3
github.com/stretchr/testify v1.9.0
github.com/stretchr/testify v1.10.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.53.0
go.opentelemetry.io/otel v1.28.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0
Expand All @@ -47,14 +47,14 @@ require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/envoyproxy/protoc-gen-validate v1.0.4 // indirect
github.com/envoyproxy/protoc-gen-validate v1.1.0 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-logfmt/logfmt v0.6.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect
github.com/planetscale/vtprotobuf v0.5.1-0.20231212170721-e7d721933795 // indirect
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/common v0.48.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
Expand Down
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,15 @@ github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.m
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
github.com/envoyproxy/go-control-plane v0.12.1-0.20240123181358-841e293a220b h1:M0BhcNaW04UV1haQO8IFSDB64dAeiBSsTMZks/sYDcQ=
github.com/envoyproxy/go-control-plane v0.12.1-0.20240123181358-841e293a220b/go.mod h1:lFu6itz1hckLR2A3aJ+ZKf3lu8HpjTsJSsqvVF6GL6g=
github.com/envoyproxy/go-control-plane v0.13.1 h1:vPfJZCkob6yTMEgS+0TwfTUfbHjfy/6vOJ8hUWX/uXE=
github.com/envoyproxy/go-control-plane v0.13.1/go.mod h1:X45hY0mufo6Fd0KW3rqsGvQMw58jvjymeCzBU3mWyHw=
github.com/envoyproxy/go-control-plane v0.13.2-0.20241219025321-f011ad88ec17 h1:vJbk97KFgBX0QdyydT18FDmwqCeRZzUYUdm/o338h8I=
github.com/envoyproxy/go-control-plane v0.13.2-0.20241219025321-f011ad88ec17/go.mod h1:lHUJZHyVI6Q4Vr6qjD60ZHBybFRLzqoKVZGIJi0/i8s=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A=
github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew=
github.com/envoyproxy/protoc-gen-validate v1.1.0 h1:tntQDh69XqOCOZsDz0lVJQez/2L6Uu2PdjCQwWCJ3bM=
github.com/envoyproxy/protoc-gen-validate v1.1.0/go.mod h1:sXRDRVmzEbkM7CVcM06s9shE/m23dg3wzjl0UWqJ2q4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
Expand Down Expand Up @@ -107,6 +113,8 @@ github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/planetscale/vtprotobuf v0.5.1-0.20231212170721-e7d721933795 h1:pH+U6pJP0BhxqQ4njBUjOg0++WMMvv3eByWzB+oATBY=
github.com/planetscale/vtprotobuf v0.5.1-0.20231212170721-e7d721933795/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8=
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 h1:GFCKgmp0tecUJ0sJuv4pzYCqS9+RGSn52M3FUwPs+uo=
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE=
Expand Down Expand Up @@ -142,6 +150,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
Expand Down
46 changes: 23 additions & 23 deletions src/limiter/base_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ type BaseRateLimiter struct {

type LimitInfo struct {
limit *config.RateLimit
limitBeforeIncrease uint32
limitAfterIncrease uint32
nearLimitThreshold uint32
overLimitThreshold uint32
limitBeforeIncrease uint64
limitAfterIncrease uint64
nearLimitThreshold uint64
overLimitThreshold uint64
}

func NewRateLimitInfo(limit *config.RateLimit, limitBeforeIncrease uint32, limitAfterIncrease uint32,
nearLimitThreshold uint32, overLimitThreshold uint32,
func NewRateLimitInfo(limit *config.RateLimit, limitBeforeIncrease uint64, limitAfterIncrease uint64,
nearLimitThreshold uint64, overLimitThreshold uint64,
) *LimitInfo {
return &LimitInfo{
limit: limit, limitBeforeIncrease: limitBeforeIncrease, limitAfterIncrease: limitAfterIncrease,
Expand All @@ -44,7 +44,7 @@ func NewRateLimitInfo(limit *config.RateLimit, limitBeforeIncrease uint32, limit
// Generates cache keys for given rate limit request. Each cache key is represented by a concatenation of
// domain, descriptor and current timestamp.
func (this *BaseRateLimiter) GenerateCacheKeys(request *pb.RateLimitRequest,
limits []*config.RateLimit, hitsAddend uint32,
limits []*config.RateLimit, hitsAddends []uint64,
) []CacheKey {
assert.Assert(len(request.Descriptors) == len(limits))
cacheKeys := make([]CacheKey, len(request.Descriptors))
Expand All @@ -55,7 +55,7 @@ func (this *BaseRateLimiter) GenerateCacheKeys(request *pb.RateLimitRequest,
cacheKeys[i] = this.cacheKeyGenerator.GenerateCacheKey(request.Domain, request.Descriptors[i], limits[i], now)
// Increase statistics for limits hit by their respective requests.
if limits[i] != nil {
limits[i].Stats.TotalHits.Add(uint64(hitsAddend))
limits[i].Stats.TotalHits.Add(hitsAddends[i])
}
}
return cacheKeys
Expand All @@ -74,14 +74,14 @@ func (this *BaseRateLimiter) IsOverLimitWithLocalCache(key string) bool {
}

func (this *BaseRateLimiter) IsOverLimitThresholdReached(limitInfo *LimitInfo) bool {
limitInfo.overLimitThreshold = limitInfo.limit.Limit.RequestsPerUnit
limitInfo.overLimitThreshold = uint64(limitInfo.limit.Limit.RequestsPerUnit)
return limitInfo.limitAfterIncrease > limitInfo.overLimitThreshold
}

// Generates response descriptor status based on cache key, over the limit with local cache, over the limit and
// near the limit thresholds. Thresholds are checked in order and are mutually exclusive.
func (this *BaseRateLimiter) GetResponseDescriptorStatus(key string, limitInfo *LimitInfo,
isOverLimitWithLocalCache bool, hitsAddend uint32,
isOverLimitWithLocalCache bool, hitsAddend uint64,
) *pb.RateLimitResponse_DescriptorStatus {
if key == "" {
return this.generateResponseDescriptorStatus(pb.RateLimitResponse_OK,
Expand All @@ -91,15 +91,15 @@ func (this *BaseRateLimiter) GetResponseDescriptorStatus(key string, limitInfo *
isOverLimit := false
if isOverLimitWithLocalCache {
isOverLimit = true
limitInfo.limit.Stats.OverLimit.Add(uint64(hitsAddend))
limitInfo.limit.Stats.OverLimitWithLocalCache.Add(uint64(hitsAddend))
limitInfo.limit.Stats.OverLimit.Add(hitsAddend)
limitInfo.limit.Stats.OverLimitWithLocalCache.Add(hitsAddend)
responseDescriptorStatus = this.generateResponseDescriptorStatus(pb.RateLimitResponse_OVER_LIMIT,
limitInfo.limit.Limit, 0)
} else {
limitInfo.overLimitThreshold = limitInfo.limit.Limit.RequestsPerUnit
limitInfo.overLimitThreshold = uint64(limitInfo.limit.Limit.RequestsPerUnit)
// The nearLimitThreshold is the number of requests that can be made before hitting the nearLimitRatio.
// We need to know it in both the OK and OVER_LIMIT scenarios.
limitInfo.nearLimitThreshold = uint32(math.Floor(float64(float32(limitInfo.overLimitThreshold) * this.nearLimitRatio)))
limitInfo.nearLimitThreshold = uint64(math.Floor(float64(float32(limitInfo.overLimitThreshold) * this.nearLimitRatio)))
logger.Debugf("cache key: %s current: %d", key, limitInfo.limitAfterIncrease)
if limitInfo.limitAfterIncrease > limitInfo.overLimitThreshold {
isOverLimit = true
Expand All @@ -123,7 +123,7 @@ func (this *BaseRateLimiter) GetResponseDescriptorStatus(key string, limitInfo *
}
} else {
responseDescriptorStatus = this.generateResponseDescriptorStatus(pb.RateLimitResponse_OK,
limitInfo.limit.Limit, limitInfo.overLimitThreshold-limitInfo.limitAfterIncrease)
limitInfo.limit.Limit, uint32(limitInfo.overLimitThreshold-limitInfo.limitAfterIncrease))

// The limit is OK but we additionally want to know if we are near the limit.
this.checkNearLimitThreshold(limitInfo, hitsAddend)
Expand Down Expand Up @@ -156,38 +156,38 @@ func NewBaseRateLimit(timeSource utils.TimeSource, jitterRand *rand.Rand, expira
}
}

func (this *BaseRateLimiter) checkOverLimitThreshold(limitInfo *LimitInfo, hitsAddend uint32) {
func (this *BaseRateLimiter) checkOverLimitThreshold(limitInfo *LimitInfo, hitsAddend uint64) {
// Increase over limit statistics. Because we support += behavior for increasing the limit, we need to
// assess if the entire hitsAddend were over the limit. That is, if the limit's value before adding the
// N hits was over the limit, then all the N hits were over limit.
// Otherwise, only the difference between the current limit value and the over limit threshold
// were over limit hits.
if limitInfo.limitBeforeIncrease >= limitInfo.overLimitThreshold {
limitInfo.limit.Stats.OverLimit.Add(uint64(hitsAddend))
limitInfo.limit.Stats.OverLimit.Add(hitsAddend)
} else {
limitInfo.limit.Stats.OverLimit.Add(uint64(limitInfo.limitAfterIncrease - limitInfo.overLimitThreshold))
limitInfo.limit.Stats.OverLimit.Add(limitInfo.limitAfterIncrease - limitInfo.overLimitThreshold)

// If the limit before increase was below the over limit value, then some of the hits were
// in the near limit range.
limitInfo.limit.Stats.NearLimit.Add(uint64(limitInfo.overLimitThreshold - utils.Max(limitInfo.nearLimitThreshold, limitInfo.limitBeforeIncrease)))
limitInfo.limit.Stats.NearLimit.Add(limitInfo.overLimitThreshold - max(limitInfo.nearLimitThreshold, limitInfo.limitBeforeIncrease))
}
}

func (this *BaseRateLimiter) checkNearLimitThreshold(limitInfo *LimitInfo, hitsAddend uint32) {
func (this *BaseRateLimiter) checkNearLimitThreshold(limitInfo *LimitInfo, hitsAddend uint64) {
if limitInfo.limitAfterIncrease > limitInfo.nearLimitThreshold {
// Here we also need to assess which portion of the hitsAddend were in the near limit range.
// If all the hits were over the nearLimitThreshold, then all hits are near limit. Otherwise,
// only the difference between the current limit value and the near limit threshold were near
// limit hits.
if limitInfo.limitBeforeIncrease >= limitInfo.nearLimitThreshold {
limitInfo.limit.Stats.NearLimit.Add(uint64(hitsAddend))
limitInfo.limit.Stats.NearLimit.Add(hitsAddend)
} else {
limitInfo.limit.Stats.NearLimit.Add(uint64(limitInfo.limitAfterIncrease - limitInfo.nearLimitThreshold))
limitInfo.limit.Stats.NearLimit.Add(limitInfo.limitAfterIncrease - limitInfo.nearLimitThreshold)
}
}
}

func (this *BaseRateLimiter) increaseShadowModeStats(isOverLimitWithLocalCache bool, limitInfo *LimitInfo, hitsAddend uint32) {
func (this *BaseRateLimiter) increaseShadowModeStats(isOverLimitWithLocalCache bool, limitInfo *LimitInfo, hitsAddend uint64) {
// Increase shadow mode statistics. For the same reason as over limit stats,
// if the limit value before adding the N hits over the limit, then all N hits were over limit.
if isOverLimitWithLocalCache || limitInfo.limitBeforeIncrease >= limitInfo.overLimitThreshold {
Expand Down
22 changes: 11 additions & 11 deletions src/memcached/cache_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ func (this *rateLimitMemcacheImpl) DoLimit(
logger.Debugf("starting cache lookup")

// request.HitsAddend could be 0 (default value) if not specified by the caller in the Ratelimit request.
hitsAddend := utils.Max(1, request.HitsAddend)
hitsAddends := utils.GetHitsAddends(request)

// First build a list of all cache keys that we are actually going to hit.
cacheKeys := this.baseRateLimiter.GenerateCacheKeys(request, limits, hitsAddend)
cacheKeys := this.baseRateLimiter.GenerateCacheKeys(request, limits, hitsAddends)

isOverLimitWithLocalCache := make([]bool, len(request.Descriptors))

Expand Down Expand Up @@ -121,27 +121,27 @@ func (this *rateLimitMemcacheImpl) DoLimit(
for i, cacheKey := range cacheKeys {

rawMemcacheValue, ok := memcacheValues[cacheKey.Key]
var limitBeforeIncrease uint32
var limitBeforeIncrease uint64
if ok {
decoded, err := strconv.ParseInt(string(rawMemcacheValue.Value), 10, 32)
if err != nil {
logger.Errorf("Unexpected non-numeric value in memcached: %v", rawMemcacheValue)
} else {
limitBeforeIncrease = uint32(decoded)
limitBeforeIncrease = uint64(decoded)
}

}

limitAfterIncrease := limitBeforeIncrease + hitsAddend
limitAfterIncrease := limitBeforeIncrease + hitsAddends[i]

limitInfo := limiter.NewRateLimitInfo(limits[i], limitBeforeIncrease, limitAfterIncrease, 0, 0)

responseDescriptorStatuses[i] = this.baseRateLimiter.GetResponseDescriptorStatus(cacheKey.Key,
limitInfo, isOverLimitWithLocalCache[i], hitsAddend)
limitInfo, isOverLimitWithLocalCache[i], hitsAddends[i])
}

this.waitGroup.Add(1)
runAsync(func() { this.increaseAsync(cacheKeys, isOverLimitWithLocalCache, limits, uint64(hitsAddend)) })
runAsync(func() { this.increaseAsync(cacheKeys, isOverLimitWithLocalCache, limits, hitsAddends) })
if AutoFlushForIntegrationTests {
this.Flush()
}
Expand All @@ -150,15 +150,15 @@ func (this *rateLimitMemcacheImpl) DoLimit(
}

func (this *rateLimitMemcacheImpl) increaseAsync(cacheKeys []limiter.CacheKey, isOverLimitWithLocalCache []bool,
limits []*config.RateLimit, hitsAddend uint64,
limits []*config.RateLimit, hitsAddends []uint64,
) {
defer this.waitGroup.Done()
for i, cacheKey := range cacheKeys {
if cacheKey.Key == "" || isOverLimitWithLocalCache[i] {
continue
}

_, err := this.client.Increment(cacheKey.Key, hitsAddend)
_, err := this.client.Increment(cacheKey.Key, hitsAddends[i])
if err == memcache.ErrCacheMiss {
expirationSeconds := utils.UnitToDivider(limits[i].Limit.Unit)
if this.expirationJitterMaxSeconds > 0 {
Expand All @@ -168,13 +168,13 @@ func (this *rateLimitMemcacheImpl) increaseAsync(cacheKeys []limiter.CacheKey, i
// Need to add instead of increment.
err = this.client.Add(&memcache.Item{
Key: cacheKey.Key,
Value: []byte(strconv.FormatUint(hitsAddend, 10)),
Value: []byte(strconv.FormatUint(hitsAddends[i], 10)),
Expiration: int32(expirationSeconds),
})
if err == memcache.ErrNotStored {
// There was a race condition to do this add. We should be able to increment
// now instead.
_, err := this.client.Increment(cacheKey.Key, hitsAddend)
_, err := this.client.Increment(cacheKey.Key, hitsAddends[i])
if err != nil {
logger.Errorf("Failed to increment key %s after failing to add: %s", cacheKey.Key, err)
continue
Expand Down
Loading

0 comments on commit 38500fe

Please sign in to comment.