Skip to content

Commit

Permalink
pebble: use token bucket for rate limiting
Browse files Browse the repository at this point in the history
Switch rate limiter from using a copy of `x/time/rate` to the
TokenBucket from Cockroach, which has a nicer interface.

We change `rate.Limiter` to a thin wrapper that provides thread safety
and a few convenience methods.
  • Loading branch information
RaduBerinde committed Jun 15, 2023
1 parent 9f82737 commit 9a02977
Show file tree
Hide file tree
Showing 14 changed files with 239 additions and 1,001 deletions.
13 changes: 4 additions & 9 deletions cmd/pebble/random.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,25 +73,20 @@ func (f *rateFlag) newRateLimiter() *rate.Limiter {
return nil
}
rng := randvar.NewRand()
limiter := rate.NewLimiter(rate.Limit(f.Uint64(rng)), 1)
limiter := rate.NewLimiter(float64(f.Uint64(rng)), 1)
if f.fluctuateDuration != 0 {
go func(limiter *rate.Limiter) {
ticker := time.NewTicker(f.fluctuateDuration)
for range ticker.C {
limiter.SetLimit(rate.Limit(f.Uint64(rng)))
limiter.SetRate(float64(f.Uint64(rng)))
}
}(limiter)
}
return limiter
}

func wait(l *rate.Limiter) {
if l == nil {
return
}

d := l.DelayN(time.Now(), 1)
if d > 0 && d != rate.InfDuration {
time.Sleep(d)
if l != nil {
l.Wait(1)
}
}
4 changes: 2 additions & 2 deletions cmd/pebble/write_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ func newPauseWriter(y *ycsb, initialRate float64) *pauseWriter {
const burst = 1
return &pauseWriter{
y: y,
limiter: rate.NewLimiter(rate.Limit(initialRate), burst),
limiter: rate.NewLimiter(float64(initialRate), burst),
pauseC: make(chan struct{}),
unpauseC: make(chan struct{}),
}
Expand Down Expand Up @@ -479,5 +479,5 @@ func (w *pauseWriter) unpause() {

// setRate sets the rate limit for this writer.
func (w *pauseWriter) setRate(r float64) {
w.limiter.SetLimit(rate.Limit(r))
w.limiter.SetRate(r)
}
8 changes: 4 additions & 4 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -3653,8 +3653,8 @@ func (d *DB) doDeleteObsoleteFiles(jobID int) {
// must NOT be held when calling this method.
func (d *DB) paceAndDeleteObsoleteFiles(jobID int, files []obsoleteFile) {
defer d.deleters.Done()
pacer := (pacer)(nilPacer)
if d.opts.TargetByteDeletionRate > 0 {
var pacer *deletionPacer
if d.deletionLimiter != nil {
pacer = newDeletionPacer(d.deletionLimiter, d.getDeletionPacerInfo)
}

Expand All @@ -3665,8 +3665,8 @@ func (d *DB) paceAndDeleteObsoleteFiles(jobID int, files []obsoleteFile) {
meta, err := d.objProvider.Lookup(of.fileType, of.fileNum)
// If we get an error here, deleteObsoleteObject won't actually delete
// anything, so we don't need to throttle.
if err == nil && !meta.IsShared() {
_ = pacer.maybeThrottle(of.fileSize)
if pacer != nil && err == nil && !meta.IsShared() {
pacer.maybeThrottle(of.fileSize)
}
d.mu.Lock()
d.mu.versions.metrics.Table.ObsoleteCount--
Expand Down
4 changes: 3 additions & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/internal/manual"
"github.com/cockroachdb/pebble/internal/rate"
"github.com/cockroachdb/pebble/objstorage"
"github.com/cockroachdb/pebble/rangekey"
"github.com/cockroachdb/pebble/record"
Expand Down Expand Up @@ -306,7 +307,8 @@ type DB struct {
closed *atomic.Value
closedCh chan struct{}

deletionLimiter limiter
// deletionLimiter is set when TargetByteDeletionRate is set.
deletionLimiter *rate.Limiter

// Async deletion jobs spawned by cleaners increment this WaitGroup, and
// call Done when completed. Once `d.mu.cleaning` is false, the db.Close()
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/cockroachdb/datadriven v1.0.3-0.20230413201302-be42291fc80f
github.com/cockroachdb/errors v1.8.1
github.com/cockroachdb/redact v1.0.8
github.com/cockroachdb/tokenbucket v0.0.0-20230613231145-182959a1fad6
github.com/ghemawat/stream v0.0.0-20171120220530-696b145b53b9
github.com/golang/snappy v0.0.4
github.com/guptarohit/asciigraph v0.5.5
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ github.com/cockroachdb/redact v1.0.8 h1:8QG/764wK+vmEYoOlfobpe12EQcS81ukx/a4hdVM
github.com/cockroachdb/redact v1.0.8/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg=
github.com/cockroachdb/sentry-go v0.6.1-cockroachdb.2 h1:IKgmqgMQlVJIZj19CdocBeSfSaiCbEBZGKODaixqtHM=
github.com/cockroachdb/sentry-go v0.6.1-cockroachdb.2/go.mod h1:8BT+cPK6xvFOcRlk0R8eg+OTkcqI6baNH4xAkpiYVvQ=
github.com/cockroachdb/tokenbucket v0.0.0-20230613231145-182959a1fad6 h1:DJK8W/iB+s/qkTtmXSrHA49lp5O3OsR7E6z4byOLy34=
github.com/cockroachdb/tokenbucket v0.0.0-20230613231145-182959a1fad6/go.mod h1:7nc4anLGjupUW/PeY5qiNYsdNXj7zopG+eqsS7To5IQ=
github.com/codegangsta/inject v0.0.0-20150114235600-33e0aa1cb7c0/go.mod h1:4Zcjuz89kmFXt9morQgcfYZAYZ5n8WHjt81YYWIwtTM=
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
Expand Down
23 changes: 11 additions & 12 deletions internal/pacertoy/pebble/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package main

import (
"context"
"fmt"
"math"
"os"
Expand Down Expand Up @@ -64,10 +63,10 @@ func (p *compactionPacer) fill(n int64) {
}

func (p *compactionPacer) drain(n int64, delay bool) bool {
p.maxDrainer.WaitN(context.Background(), int(n))
p.maxDrainer.Wait(float64(n))

if delay {
p.minDrainer.WaitN(context.Background(), int(n))
p.minDrainer.Wait(float64(n))
}
level := atomic.AddInt64(&p.level, -n)
return level <= compactionDebtSlowdownThreshold
Expand Down Expand Up @@ -99,10 +98,10 @@ func (p *flushPacer) fill(n int64) {
}

func (p *flushPacer) drain(n int64, delay bool) bool {
p.maxDrainer.WaitN(context.Background(), int(n))
p.maxDrainer.Wait(float64(n))

if delay {
p.minDrainer.WaitN(context.Background(), int(n))
p.minDrainer.Wait(float64(n))
}
level := atomic.AddInt64(&p.level, -n)
p.fillCond.Signal()
Expand Down Expand Up @@ -275,16 +274,16 @@ func (db *DB) delayMemtableDrain() {
db.mu.Lock()
if compactionDebt > compactionDebtSlowdownThreshold {
// Compaction debt is above the threshold and the debt is growing. Throttle memtable flushing.
drainLimit := maxFlushRate * rate.Limit(compactionDebtSlowdownThreshold/compactionDebt)
drainLimit := maxFlushRate * float64(compactionDebtSlowdownThreshold/compactionDebt)
if drainLimit > 0 && drainLimit <= maxFlushRate {
db.flushPacer.maxDrainer.SetLimit(drainLimit)
db.flushPacer.maxDrainer.SetRate(drainLimit)
}
} else {
// Continuously speed up memtable flushing to make sure that slowdown signal did not
// decrease the memtable flush rate by too much.
drainLimit := db.flushPacer.maxDrainer.Limit() * 1.05
drainLimit := db.flushPacer.maxDrainer.Rate() * 1.05
if drainLimit > 0 && drainLimit <= maxFlushRate {
db.flushPacer.maxDrainer.SetLimit(drainLimit)
db.flushPacer.maxDrainer.SetRate(drainLimit)
}
}

Expand Down Expand Up @@ -320,7 +319,7 @@ func simulateWrite(db *DB, measureLatencyMode bool) {

setRate := func(mb int) {
fmt.Printf("filling at %d MB/sec\n", mb)
limiter.SetLimit(rate.Limit(mb << 20))
limiter.SetRate(float64(mb << 20))
}

if !measureLatencyMode {
Expand All @@ -346,7 +345,7 @@ func simulateWrite(db *DB, measureLatencyMode bool) {
for totalWrites <= writeAmount {
size := 1000 + rng.Int63n(50)
if !measureLatencyMode {
limiter.WaitN(context.Background(), int(size))
limiter.Wait(float64(size))
}
db.fillMemtable(size)

Expand Down Expand Up @@ -399,7 +398,7 @@ func main() {
db.compactionMu.Unlock()
totalCompactionBytes := atomic.LoadInt64(&db.compactionPacer.level)
compactionDebt := math.Max(float64(totalCompactionBytes)-l0CompactionThreshold*memtableSize, 0.0)
maxFlushRate := db.flushPacer.maxDrainer.Limit()
maxFlushRate := db.flushPacer.maxDrainer.Rate()

now := time.Now()
elapsed := now.Sub(lastNow).Seconds()
Expand Down
23 changes: 11 additions & 12 deletions internal/pacertoy/rocksdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package main

import (
"context"
"fmt"
"math"
"sync"
Expand Down Expand Up @@ -54,7 +53,7 @@ func (p *compactionPacer) fill(n int64) {
}

func (p *compactionPacer) drain(n int64) {
p.drainer.WaitN(context.Background(), int(n))
p.drainer.Wait(float64(n))

atomic.AddInt64(&p.level, -n)
}
Expand Down Expand Up @@ -256,15 +255,15 @@ func (db *DB) delayUserWrites() {
db.previouslyInDebt = true
if compactionDebt > db.prevCompactionDebt {
// Debt is growing.
drainLimit := db.writeLimiter.Limit() * 0.8
drainLimit := db.writeLimiter.Rate() * 0.8
if drainLimit > 0 {
db.writeLimiter.SetLimit(drainLimit)
db.writeLimiter.SetRate(drainLimit)
}
} else {
// Debt is shrinking.
drainLimit := db.writeLimiter.Limit() * 1 / 0.8
drainLimit := db.writeLimiter.Rate() * 1 / 0.8
if drainLimit <= maxWriteRate {
db.writeLimiter.SetLimit(drainLimit)
db.writeLimiter.SetRate(drainLimit)
}
}
} else if db.previouslyInDebt {
Expand All @@ -275,9 +274,9 @@ func (db *DB) delayUserWrites() {
// If the DB recovers from delay conditions, we reward with reducing
// double the slowdown ratio. This is to balance the long term slowdown
// increase signal.
drainLimit := db.writeLimiter.Limit() * 1.4
drainLimit := db.writeLimiter.Rate() * 1.4
if drainLimit <= maxWriteRate {
db.writeLimiter.SetLimit(drainLimit)
db.writeLimiter.SetRate(drainLimit)
}
db.previouslyInDebt = false
}
Expand Down Expand Up @@ -311,7 +310,7 @@ func simulateWrite(db *DB) {

setRate := func(mb int) {
fmt.Printf("filling at %d MB/sec\n", mb)
limiter.SetLimit(rate.Limit(mb << 20))
limiter.SetRate(float64(mb << 20))
}

go func() {
Expand All @@ -328,8 +327,8 @@ func simulateWrite(db *DB) {

for {
size := 1000 + rng.Int63n(50)
limiter.WaitN(context.Background(), int(size))
db.writeLimiter.WaitN(context.Background(), int(size))
limiter.Wait(float64(size))
db.writeLimiter.Wait(float64(size))
db.fillMemtable(size)
}
}
Expand Down Expand Up @@ -362,7 +361,7 @@ func main() {
db.compactionMu.Unlock()
totalCompactionBytes := atomic.LoadInt64(&db.compactionPacer.level)
compactionDebt := math.Max(float64(totalCompactionBytes)-l0CompactionThreshold*memtableSize, 0.0)
maxWriteRate := db.writeLimiter.Limit()
maxWriteRate := db.writeLimiter.Rate()

now := time.Now()
elapsed := now.Sub(lastNow).Seconds()
Expand Down
Loading

0 comments on commit 9a02977

Please sign in to comment.