Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ratelimit: impl BBR-like algorithm #7246

Open
wants to merge 32 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
9c18e3f
add time window package
CabinfeverB Oct 18, 2023
b8be11c
refactor ratelimit pkg
CabinfeverB Oct 19, 2023
9d27388
Merge branch 'rate_limit/time_window' into rate_limit/bbr_v1
CabinfeverB Oct 19, 2023
d2a6ac3
refactor ratelimit pkg
CabinfeverB Oct 19, 2023
5c242b8
refactor ratelimit pkg
CabinfeverB Oct 19, 2023
a0e898e
add time window package
CabinfeverB Oct 19, 2023
b165137
Merge branch 'rate_limit/time_window' into rate_limit/bbr_v1
CabinfeverB Oct 19, 2023
b8cde6f
refactor test
CabinfeverB Oct 22, 2023
1d87c04
merge refactor
CabinfeverB Oct 22, 2023
c9c186d
impl bbr
CabinfeverB Oct 23, 2023
341c607
Merge branch 'master' into rate_limit/bbr_v1
CabinfeverB Oct 23, 2023
753c9d4
address comment
CabinfeverB Oct 23, 2023
9fb345f
Merge branch 'master' into rate_limit/refactor
CabinfeverB Oct 23, 2023
8c6a990
address comment
CabinfeverB Oct 23, 2023
f44cec1
Merge branch 'rate_limit/refactor' into rate_limit/bbr_v1
CabinfeverB Oct 23, 2023
12d9529
address comment
CabinfeverB Oct 23, 2023
1eba385
Merge branch 'rate_limit/refactor' into rate_limit/bbr_v1
CabinfeverB Oct 23, 2023
9132fc5
fix test
CabinfeverB Oct 25, 2023
e300b85
ci test
CabinfeverB Oct 26, 2023
59d9e23
ci test
CabinfeverB Oct 26, 2023
d86d468
ci test
CabinfeverB Oct 26, 2023
8ad5886
ci test
CabinfeverB Oct 26, 2023
90909ab
fix
CabinfeverB Oct 26, 2023
c48e6a5
fix test in ci
CabinfeverB Oct 26, 2023
defcd0e
bbr half window
CabinfeverB Nov 3, 2023
9993b38
Merge branch 'master' into rate_limit/bbr_v1
CabinfeverB Nov 8, 2023
68ba604
merge master
CabinfeverB Feb 6, 2024
def7088
merge master
CabinfeverB Feb 6, 2024
d3a4a3c
add comment
CabinfeverB Feb 7, 2024
b13305c
address comment
CabinfeverB Feb 9, 2024
535ad0c
add test
CabinfeverB Feb 19, 2024
348076e
fix test
CabinfeverB Feb 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
283 changes: 283 additions & 0 deletions pkg/ratelimit/bbr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
// The MIT License (MIT)
// Copyright (c) 2022 go-kratos Project Authors.
//
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ratelimit

import (
"math"
"sort"
"sync/atomic"
"time"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/window"
"go.uber.org/zap"
)

const (
inf = int64(^uint64(0) >> 1)

defaultWindowSize = time.Second * 10
defaultBucketSize = 100
)

type cache struct {
val int64
time time.Time
}

type bbrOption func(*bbrConfig)

type bbrConfig struct {
// WindowSize defines time duration per window
Window time.Duration
// BucketNum defines bucket number for each window
Bucket int
}

func newConfig(opts ...bbrOption) *bbrConfig {
cfg := &bbrConfig{
Window: defaultWindowSize,
Bucket: defaultBucketSize,
}
for _, o := range opts {
o(cfg)
}
return cfg
}

// WithWindow with window size.
func WithWindow(d time.Duration) bbrOption {
return func(o *bbrConfig) {
o.Window = d
}
}

// WithBucket with bucket ize.
func WithBucket(b int) bbrOption {
return func(o *bbrConfig) {
o.Bucket = b
}
}

type bbrStatus struct {
maxInFlight atomic.Int64
minRT atomic.Int64
}

func (f *bbrStatus) storeMaxInFlight(m int64) {
f.maxInFlight.Store(m)
}

func (f *bbrStatus) getMaxInFlight() int64 {
return f.maxInFlight.Load()
}

func (f *bbrStatus) storeMinRT(m int64) {
f.minRT.Store(m)
}

func (f *bbrStatus) getMinRT() int64 {
return f.minRT.Load()
}

type feedbackOpt func(*bbrStatus)

type bbr struct {
cfg *bbrConfig
bucketPerSecond int64
bucketDuration time.Duration

// statistics
passStat window.RollingCounter
rtStat window.RollingCounter
inFlightStat window.RollingCounter

// full status
lastUpdateTime atomic.Value
inCheck atomic.Int64
bbrStatus *bbrStatus

feedbacks []feedbackOpt

maxPASSCache atomic.Value
minRtCache atomic.Value
}

func newBBR(cfg *bbrConfig, feedbacks ...feedbackOpt) *bbr {
bucketDuration := cfg.Window / time.Duration(cfg.Bucket)
passStat := window.NewRollingCounter(window.RollingCounterOpts{Size: cfg.Bucket, BucketDuration: bucketDuration})
rtStat := window.NewRollingCounter(window.RollingCounterOpts{Size: cfg.Bucket, BucketDuration: bucketDuration})
inFlightStat := window.NewRollingCounter(window.RollingCounterOpts{Size: cfg.Bucket, BucketDuration: bucketDuration})

limiter := &bbr{
cfg: cfg,
feedbacks: feedbacks,
bucketDuration: bucketDuration,
bucketPerSecond: int64(time.Second / bucketDuration),
passStat: passStat,
rtStat: rtStat,
inFlightStat: inFlightStat,
bbrStatus: &bbrStatus{},
}
limiter.bbrStatus.storeMaxInFlight(inf)
return limiter
}

// timespan returns the passed bucket count
// since lastTime, if it is one bucket duration earlier than
// the last recorded time, it will return the BucketNum.
func (l *bbr) timespan(lastTime time.Time) int {
v := int(time.Since(lastTime) / l.bucketDuration)
if v > -1 {
return v
}
return l.cfg.Bucket
}

func (l *bbr) getMaxInFlight() int64 {
return int64(math.Floor(float64(l.getMaxPASS()*l.getMinRT()*l.bucketPerSecond)/1e6) + 0.5)
}

func (l *bbr) getMaxPASS() int64 {
passCache := l.maxPASSCache.Load()
if passCache != nil {
ps := passCache.(*cache)
if l.timespan(ps.time) < 1 {
return ps.val
}
}
rawMaxPass := int64(l.passStat.Reduce(func(iterator window.Iterator) float64 {
sli := make([]float64, 0, l.cfg.Bucket-1)
var result = 1.0
for i := 1; iterator.Next() && i < l.cfg.Bucket; i++ {
bucket := iterator.Bucket()
count := 0.0
for _, p := range bucket.Points {
count += p
}
sli = append(sli, count)
result = math.Max(result, count)
}

sort.Slice(sli, func(i, j int) bool {
return sli[i] > sli[j]
})
return result
}))
l.maxPASSCache.Store(&cache{
val: rawMaxPass,
time: time.Now(),
})
return rawMaxPass
}

func (l *bbr) getMinRT() int64 {
rtCache := l.minRtCache.Load()
if rtCache != nil {
rc := rtCache.(*cache)
if l.timespan(rc.time) < 1 {
return rc.val
}
}
rawMinRT := int64(math.Ceil(l.rtStat.Reduce(func(iterator window.Iterator) float64 {
var result = float64(time.Minute)
for i := 1; iterator.Next() && i < l.cfg.Bucket; i++ {
bucket := iterator.Bucket()
if len(bucket.Points) == 0 {
continue
}
total := 0.0
for _, p := range bucket.Points {
total += p
}
log.Info("getMinRT", zap.Float64("total", total), zap.Int64("bucket.Count", bucket.Count))
CabinfeverB marked this conversation as resolved.
Show resolved Hide resolved
avg := total / float64(bucket.Count)
result = math.Min(result, avg)
}
return result
})))
if rawMinRT <= 0 {
rawMinRT = 1
}
l.minRtCache.Store(&cache{
val: rawMinRT,
time: time.Now(),
})
return rawMinRT
}

func (l *bbr) checkFullStatus() {
temp := l.lastUpdateTime.Load()
if temp != nil {
t := temp.(time.Time)
if l.timespan(t) < 1 {
return
}
}
if !l.inCheck.CompareAndSwap(0, 1) {
return
}
positive := 0
negative := 0
raises := math.Ceil(l.inFlightStat.Reduce(func(iterator window.Iterator) float64 {
var result = 0.
for i := 1; iterator.Next() && i < l.cfg.Bucket; i++ {
bucket := iterator.Bucket()
total := 0.0
if len(bucket.Points) == 0 {
negative++
continue
}
for _, p := range bucket.Points {
total += p
}
result += total
if total > 0 {
positive++
} else {
negative++
}
}
return result
}))

l.inCheck.Store(0)

if raises > 0 && positive > negative && l.bbrStatus.getMaxInFlight() == inf {
maxInFlight := l.getMaxInFlight()
l.bbrStatus.storeMaxInFlight(maxInFlight)
l.bbrStatus.storeMinRT(l.getMinRT())
for _, fd := range l.feedbacks {
fd(l.bbrStatus)
}
}
l.lastUpdateTime.Store(time.Now())
}

func (l *bbr) process() DoneFunc {
l.inFlightStat.Add(1)
start := time.Now().UnixMicro()
l.checkFullStatus()
return func() {
if rt := time.Now().UnixMicro() - start; rt > 0 {
l.rtStat.Add(rt)
}
l.inFlightStat.Add(-1)
l.passStat.Add(1)
}
}
Loading
Loading