diff --git a/cmd/pebble/random.go b/cmd/pebble/random.go index d41ed9b222..c098b740b1 100644 --- a/cmd/pebble/random.go +++ b/cmd/pebble/random.go @@ -73,12 +73,12 @@ func (f *rateFlag) newRateLimiter() *rate.Limiter { return nil } rng := randvar.NewRand() - limiter := rate.NewLimiter(rate.Limit(f.Uint64(rng)), 1) + limiter := rate.NewLimiter(float64(f.Uint64(rng)), 1) if f.fluctuateDuration != 0 { go func(limiter *rate.Limiter) { ticker := time.NewTicker(f.fluctuateDuration) for range ticker.C { - limiter.SetLimit(rate.Limit(f.Uint64(rng))) + limiter.SetRate(float64(f.Uint64(rng))) } }(limiter) } @@ -86,12 +86,7 @@ func (f *rateFlag) newRateLimiter() *rate.Limiter { } func wait(l *rate.Limiter) { - if l == nil { - return - } - - d := l.DelayN(time.Now(), 1) - if d > 0 && d != rate.InfDuration { - time.Sleep(d) + if l != nil { + l.Wait(1) } } diff --git a/cmd/pebble/write_bench.go b/cmd/pebble/write_bench.go index 1222204b36..397a5361b5 100644 --- a/cmd/pebble/write_bench.go +++ b/cmd/pebble/write_bench.go @@ -439,7 +439,7 @@ func newPauseWriter(y *ycsb, initialRate float64) *pauseWriter { const burst = 1 return &pauseWriter{ y: y, - limiter: rate.NewLimiter(rate.Limit(initialRate), burst), + limiter: rate.NewLimiter(float64(initialRate), burst), pauseC: make(chan struct{}), unpauseC: make(chan struct{}), } @@ -479,5 +479,5 @@ func (w *pauseWriter) unpause() { // setRate sets the rate limit for this writer. func (w *pauseWriter) setRate(r float64) { - w.limiter.SetLimit(rate.Limit(r)) + w.limiter.SetRate(r) } diff --git a/compaction.go b/compaction.go index 51b080b5d6..f7f9014ba0 100644 --- a/compaction.go +++ b/compaction.go @@ -3653,8 +3653,8 @@ func (d *DB) doDeleteObsoleteFiles(jobID int) { // must NOT be held when calling this method. func (d *DB) paceAndDeleteObsoleteFiles(jobID int, files []obsoleteFile) { defer d.deleters.Done() - pacer := (pacer)(nilPacer) - if d.opts.TargetByteDeletionRate > 0 { + var pacer *deletionPacer + if d.deletionLimiter != nil { pacer = newDeletionPacer(d.deletionLimiter, d.getDeletionPacerInfo) } @@ -3665,8 +3665,8 @@ func (d *DB) paceAndDeleteObsoleteFiles(jobID int, files []obsoleteFile) { meta, err := d.objProvider.Lookup(of.fileType, of.fileNum) // If we get an error here, deleteObsoleteObject won't actually delete // anything, so we don't need to throttle. - if err == nil && !meta.IsShared() { - _ = pacer.maybeThrottle(of.fileSize) + if pacer != nil && err == nil && !meta.IsShared() { + pacer.maybeThrottle(of.fileSize) } d.mu.Lock() d.mu.versions.metrics.Table.ObsoleteCount-- diff --git a/db.go b/db.go index 06a4e86af6..c22fd93f33 100644 --- a/db.go +++ b/db.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/manifest" "github.com/cockroachdb/pebble/internal/manual" + "github.com/cockroachdb/pebble/internal/rate" "github.com/cockroachdb/pebble/objstorage" "github.com/cockroachdb/pebble/rangekey" "github.com/cockroachdb/pebble/record" @@ -306,7 +307,8 @@ type DB struct { closed *atomic.Value closedCh chan struct{} - deletionLimiter limiter + // deletionLimiter is set when TargetByteDeletionRate is set. + deletionLimiter *rate.Limiter // Async deletion jobs spawned by cleaners increment this WaitGroup, and // call Done when completed. Once `d.mu.cleaning` is false, the db.Close() diff --git a/go.mod b/go.mod index 418ef54062..43452c16d8 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/cockroachdb/datadriven v1.0.3-0.20230413201302-be42291fc80f github.com/cockroachdb/errors v1.8.1 github.com/cockroachdb/redact v1.0.8 + github.com/cockroachdb/tokenbucket v0.0.0-20230613231145-182959a1fad6 github.com/ghemawat/stream v0.0.0-20171120220530-696b145b53b9 github.com/golang/snappy v0.0.4 github.com/guptarohit/asciigraph v0.5.5 diff --git a/go.sum b/go.sum index 8921a1975b..c5c1dcf926 100644 --- a/go.sum +++ b/go.sum @@ -89,6 +89,8 @@ github.com/cockroachdb/redact v1.0.8 h1:8QG/764wK+vmEYoOlfobpe12EQcS81ukx/a4hdVM github.com/cockroachdb/redact v1.0.8/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg= github.com/cockroachdb/sentry-go v0.6.1-cockroachdb.2 h1:IKgmqgMQlVJIZj19CdocBeSfSaiCbEBZGKODaixqtHM= github.com/cockroachdb/sentry-go v0.6.1-cockroachdb.2/go.mod h1:8BT+cPK6xvFOcRlk0R8eg+OTkcqI6baNH4xAkpiYVvQ= +github.com/cockroachdb/tokenbucket v0.0.0-20230613231145-182959a1fad6 h1:DJK8W/iB+s/qkTtmXSrHA49lp5O3OsR7E6z4byOLy34= +github.com/cockroachdb/tokenbucket v0.0.0-20230613231145-182959a1fad6/go.mod h1:7nc4anLGjupUW/PeY5qiNYsdNXj7zopG+eqsS7To5IQ= github.com/codegangsta/inject v0.0.0-20150114235600-33e0aa1cb7c0/go.mod h1:4Zcjuz89kmFXt9morQgcfYZAYZ5n8WHjt81YYWIwtTM= github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= diff --git a/internal/pacertoy/pebble/main.go b/internal/pacertoy/pebble/main.go index 9542d392c5..6dd9405c87 100644 --- a/internal/pacertoy/pebble/main.go +++ b/internal/pacertoy/pebble/main.go @@ -5,7 +5,6 @@ package main import ( - "context" "fmt" "math" "os" @@ -64,10 +63,10 @@ func (p *compactionPacer) fill(n int64) { } func (p *compactionPacer) drain(n int64, delay bool) bool { - p.maxDrainer.WaitN(context.Background(), int(n)) + p.maxDrainer.Wait(float64(n)) if delay { - p.minDrainer.WaitN(context.Background(), int(n)) + p.minDrainer.Wait(float64(n)) } level := atomic.AddInt64(&p.level, -n) return level <= compactionDebtSlowdownThreshold @@ -99,10 +98,10 @@ func (p *flushPacer) fill(n int64) { } func (p *flushPacer) drain(n int64, delay bool) bool { - p.maxDrainer.WaitN(context.Background(), int(n)) + p.maxDrainer.Wait(float64(n)) if delay { - p.minDrainer.WaitN(context.Background(), int(n)) + p.minDrainer.Wait(float64(n)) } level := atomic.AddInt64(&p.level, -n) p.fillCond.Signal() @@ -275,16 +274,16 @@ func (db *DB) delayMemtableDrain() { db.mu.Lock() if compactionDebt > compactionDebtSlowdownThreshold { // Compaction debt is above the threshold and the debt is growing. Throttle memtable flushing. - drainLimit := maxFlushRate * rate.Limit(compactionDebtSlowdownThreshold/compactionDebt) + drainLimit := maxFlushRate * float64(compactionDebtSlowdownThreshold/compactionDebt) if drainLimit > 0 && drainLimit <= maxFlushRate { - db.flushPacer.maxDrainer.SetLimit(drainLimit) + db.flushPacer.maxDrainer.SetRate(drainLimit) } } else { // Continuously speed up memtable flushing to make sure that slowdown signal did not // decrease the memtable flush rate by too much. - drainLimit := db.flushPacer.maxDrainer.Limit() * 1.05 + drainLimit := db.flushPacer.maxDrainer.Rate() * 1.05 if drainLimit > 0 && drainLimit <= maxFlushRate { - db.flushPacer.maxDrainer.SetLimit(drainLimit) + db.flushPacer.maxDrainer.SetRate(drainLimit) } } @@ -320,7 +319,7 @@ func simulateWrite(db *DB, measureLatencyMode bool) { setRate := func(mb int) { fmt.Printf("filling at %d MB/sec\n", mb) - limiter.SetLimit(rate.Limit(mb << 20)) + limiter.SetRate(float64(mb << 20)) } if !measureLatencyMode { @@ -346,7 +345,7 @@ func simulateWrite(db *DB, measureLatencyMode bool) { for totalWrites <= writeAmount { size := 1000 + rng.Int63n(50) if !measureLatencyMode { - limiter.WaitN(context.Background(), int(size)) + limiter.Wait(float64(size)) } db.fillMemtable(size) @@ -399,7 +398,7 @@ func main() { db.compactionMu.Unlock() totalCompactionBytes := atomic.LoadInt64(&db.compactionPacer.level) compactionDebt := math.Max(float64(totalCompactionBytes)-l0CompactionThreshold*memtableSize, 0.0) - maxFlushRate := db.flushPacer.maxDrainer.Limit() + maxFlushRate := db.flushPacer.maxDrainer.Rate() now := time.Now() elapsed := now.Sub(lastNow).Seconds() diff --git a/internal/pacertoy/rocksdb/main.go b/internal/pacertoy/rocksdb/main.go index 297399b640..7ba9dc61a1 100644 --- a/internal/pacertoy/rocksdb/main.go +++ b/internal/pacertoy/rocksdb/main.go @@ -5,7 +5,6 @@ package main import ( - "context" "fmt" "math" "sync" @@ -54,7 +53,7 @@ func (p *compactionPacer) fill(n int64) { } func (p *compactionPacer) drain(n int64) { - p.drainer.WaitN(context.Background(), int(n)) + p.drainer.Wait(float64(n)) atomic.AddInt64(&p.level, -n) } @@ -256,15 +255,15 @@ func (db *DB) delayUserWrites() { db.previouslyInDebt = true if compactionDebt > db.prevCompactionDebt { // Debt is growing. - drainLimit := db.writeLimiter.Limit() * 0.8 + drainLimit := db.writeLimiter.Rate() * 0.8 if drainLimit > 0 { - db.writeLimiter.SetLimit(drainLimit) + db.writeLimiter.SetRate(drainLimit) } } else { // Debt is shrinking. - drainLimit := db.writeLimiter.Limit() * 1 / 0.8 + drainLimit := db.writeLimiter.Rate() * 1 / 0.8 if drainLimit <= maxWriteRate { - db.writeLimiter.SetLimit(drainLimit) + db.writeLimiter.SetRate(drainLimit) } } } else if db.previouslyInDebt { @@ -275,9 +274,9 @@ func (db *DB) delayUserWrites() { // If the DB recovers from delay conditions, we reward with reducing // double the slowdown ratio. This is to balance the long term slowdown // increase signal. - drainLimit := db.writeLimiter.Limit() * 1.4 + drainLimit := db.writeLimiter.Rate() * 1.4 if drainLimit <= maxWriteRate { - db.writeLimiter.SetLimit(drainLimit) + db.writeLimiter.SetRate(drainLimit) } db.previouslyInDebt = false } @@ -311,7 +310,7 @@ func simulateWrite(db *DB) { setRate := func(mb int) { fmt.Printf("filling at %d MB/sec\n", mb) - limiter.SetLimit(rate.Limit(mb << 20)) + limiter.SetRate(float64(mb << 20)) } go func() { @@ -328,8 +327,8 @@ func simulateWrite(db *DB) { for { size := 1000 + rng.Int63n(50) - limiter.WaitN(context.Background(), int(size)) - db.writeLimiter.WaitN(context.Background(), int(size)) + limiter.Wait(float64(size)) + db.writeLimiter.Wait(float64(size)) db.fillMemtable(size) } } @@ -362,7 +361,7 @@ func main() { db.compactionMu.Unlock() totalCompactionBytes := atomic.LoadInt64(&db.compactionPacer.level) compactionDebt := math.Max(float64(totalCompactionBytes)-l0CompactionThreshold*memtableSize, 0.0) - maxWriteRate := db.writeLimiter.Limit() + maxWriteRate := db.writeLimiter.Rate() now := time.Now() elapsed := now.Sub(lastNow).Seconds() diff --git a/internal/rate/rate.go b/internal/rate/rate.go index 565626f0fb..f5f801cbb0 100644 --- a/internal/rate/rate.go +++ b/internal/rate/rate.go @@ -1,387 +1,96 @@ -// Copyright 2015 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. +// Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. // Package rate provides a rate limiter. package rate // import "github.com/cockroachdb/pebble/internal/rate" import ( - "context" - "math" "sync" "time" - "github.com/cockroachdb/errors" + "github.com/cockroachdb/tokenbucket" ) -// Limit defines the maximum frequency of some events. -// Limit is represented as number of events per second. -// A zero Limit allows no events. -type Limit float64 - -// Inf is the infinite rate limit; it allows all events (even if burst is zero). -const Inf = Limit(math.MaxFloat64) - -// Every converts a minimum time interval between events to a Limit. -func Every(interval time.Duration) Limit { - if interval <= 0 { - return Inf - } - return 1 / Limit(interval.Seconds()) -} - // A Limiter controls how frequently events are allowed to happen. // It implements a "token bucket" of size b, initially full and refilled // at rate r tokens per second. +// // Informally, in any large enough time interval, the Limiter limits the // rate to r tokens per second, with a maximum burst size of b events. -// As a special case, if r == Inf (the infinite rate), b is ignored. -// See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets. // -// The zero value is a valid Limiter, but it will reject all events. -// Use NewLimiter to create non-zero Limiters. -// -// Limiter has three main methods, Allow, Reserve, and Wait. -// Most callers should use Wait. -// -// Each of the three methods consumes a single token. -// They differ in their behavior when no token is available. -// If no token is available, Allow returns false. -// If no token is available, Reserve returns a reservation for a future token -// and the amount of time the caller must wait before using it. -// If no token is available, Wait blocks until one can be obtained -// or its associated context.Context is canceled. -// -// The methods AllowN, ReserveN, and WaitN consume n tokens. +// Limiter is thread-safe. type Limiter struct { - limit Limit - burst int - - mu sync.Mutex - tokens float64 - // last is the last time the limiter's tokens field was updated - last time.Time - // lastEvent is the latest time of a rate-limited event (past or future) - lastEvent time.Time -} - -// Limit returns the maximum overall event rate. -func (lim *Limiter) Limit() Limit { - lim.mu.Lock() - defer lim.mu.Unlock() - return lim.limit -} - -// Burst returns the maximum burst size. Burst is the maximum number of tokens -// that can be consumed in a single call to Allow, Reserve, or Wait, so higher -// Burst values allow more events to happen at once. -// A zero Burst allows no events, unless limit == Inf. -func (lim *Limiter) Burst() int { - return lim.burst + mu struct { + sync.Mutex + tb tokenbucket.TokenBucket + rate float64 + burst float64 + } + sleepFn func(d time.Duration) } // NewLimiter returns a new Limiter that allows events up to rate r and permits // bursts of at most b tokens. -func NewLimiter(r Limit, b int) *Limiter { - return &Limiter{ - limit: r, - burst: b, - } -} - -// Allow is shorthand for AllowN(time.Now(), 1). -func (lim *Limiter) Allow() bool { - return lim.AllowN(time.Now(), 1) -} - -// AllowN reports whether n events may happen at time now. -// Use this method if you intend to drop / skip events that exceed the rate limit. -// Otherwise use Reserve or Wait. -func (lim *Limiter) AllowN(now time.Time, n int) bool { - return lim.reserveN(now, n, 0).ok -} - -// A Reservation holds information about events that are permitted by a Limiter to happen after a delay. -// A Reservation may be canceled, which may enable the Limiter to permit additional events. -type Reservation struct { - ok bool - lim *Limiter - tokens int - timeToAct time.Time - // This is the Limit at reservation time, it can change later. - limit Limit -} - -// OK returns whether the limiter can provide the requested number of tokens -// within the maximum wait time. If OK is false, Delay returns InfDuration, and -// Cancel does nothing. -func (r *Reservation) OK() bool { - return r.ok -} - -// Delay is shorthand for DelayFrom(time.Now()). -func (r *Reservation) Delay() time.Duration { - return r.DelayFrom(time.Now()) -} - -// InfDuration is the duration returned by Delay when a Reservation is not OK. -const InfDuration = time.Duration(1<<63 - 1) - -// DelayFrom returns the duration for which the reservation holder must wait -// before taking the reserved action. Zero duration means act immediately. -// InfDuration means the limiter cannot grant the tokens requested in this -// Reservation within the maximum wait time. -func (r *Reservation) DelayFrom(now time.Time) time.Duration { - if !r.ok { - return InfDuration - } - delay := r.timeToAct.Sub(now) - if delay < 0 { - return 0 - } - return delay -} - -// Cancel is shorthand for CancelAt(time.Now()). -func (r *Reservation) Cancel() { - r.CancelAt(time.Now()) -} - -// CancelAt indicates that the reservation holder will not perform the reserved action -// and reverses the effects of this Reservation on the rate limit as much as possible, -// considering that other reservations may have already been made. -func (r *Reservation) CancelAt(now time.Time) { - if !r.ok { - return - } - - r.lim.mu.Lock() - defer r.lim.mu.Unlock() - - if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) { - return - } - - // calculate tokens to restore - // The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved - // after r was obtained. These tokens should not be restored. - restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct)) - if restoreTokens <= 0 { - return - } - // advance time to now - now, _, tokens := r.lim.advance(now) - // calculate new number of tokens - tokens += restoreTokens - if burst := float64(r.lim.burst); tokens > burst { - tokens = burst - } - // update state - r.lim.last = now - r.lim.tokens = tokens - if r.timeToAct == r.lim.lastEvent { - prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens))) - if !prevEvent.Before(now) { - r.lim.lastEvent = prevEvent +func NewLimiter(r float64, b float64) *Limiter { + l := &Limiter{} + l.mu.tb.Init(tokenbucket.TokensPerSecond(r), tokenbucket.Tokens(b)) + l.mu.rate = r + l.mu.burst = b + return l +} + +// NewLimiterWithCustomTime returns a new Limiter that allows events up to rate +// r and permits bursts of at most b tokens. The limiter uses the given +// functions to retrieve the current time and to sleep (useful for testing). +func NewLimiterWithCustomTime( + r float64, b float64, nowFn func() time.Time, sleepFn func(d time.Duration), +) *Limiter { + l := &Limiter{} + l.mu.tb.InitWithNowFn(tokenbucket.TokensPerSecond(r), tokenbucket.Tokens(b), nowFn) + l.mu.rate = r + l.mu.burst = b + l.sleepFn = sleepFn + return l +} + +// Wait sleeps until enough tokens are available. If n is more than the burst, +// the token bucket will go into debt, delaying future operations. +func (l *Limiter) Wait(n float64) { + for { + l.mu.Lock() + ok, d := l.mu.tb.TryToFulfill(tokenbucket.Tokens(n)) + l.mu.Unlock() + if ok { + return } - } -} - -// Reserve is shorthand for ReserveN(time.Now(), 1). -func (lim *Limiter) Reserve() Reservation { - return lim.ReserveN(time.Now(), 1) -} - -// ReserveN returns a Reservation that indicates how long the caller must wait before n events happen. -// The Limiter takes this Reservation into account when allowing future events. -// ReserveN returns false if n exceeds the Limiter's burst size. -// Usage example: -// -// r := lim.ReserveN(time.Now(), 1) -// if !r.OK() { -// // Not allowed to act! Did you remember to set lim.burst to be > 0 ? -// return -// } -// time.Sleep(r.Delay()) -// Act() -// -// Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events. -// If you need to respect a deadline or cancel the delay, use Wait instead. -// To drop or skip events exceeding rate limit, use Allow instead. -func (lim *Limiter) ReserveN(now time.Time, n int) Reservation { - return lim.reserveN(now, n, InfDuration) -} - -// Wait is shorthand for WaitN(ctx, 1). -func (lim *Limiter) Wait(ctx context.Context) (err error) { - return lim.WaitN(ctx, 1) -} - -// WaitN blocks until lim permits n events to happen. -// It returns an error if n exceeds the Limiter's burst size, the Context is -// canceled, or the expected wait time exceeds the Context's Deadline. -func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) { - if n > lim.burst && lim.limit != Inf { - return errors.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", errors.Safe(n), errors.Safe(lim.burst)) - } - // Check if ctx is already cancelled - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - // Determine wait limit - now := time.Now() - waitLimit := InfDuration - if deadline, ok := ctx.Deadline(); ok { - waitLimit = deadline.Sub(now) - } - // Reserve - r := lim.reserveN(now, n, waitLimit) - if !r.ok { - return errors.Errorf("rate: Wait(n=%d) would exceed context deadline", errors.Safe(n)) - } - // Wait - d := r.DelayFrom(now) - if d <= 0 { - return nil - } - t := time.NewTimer(d) - defer t.Stop() - select { - case <-t.C: - // We can proceed. - return nil - case <-ctx.Done(): - // Context was canceled before we could proceed. Cancel the - // reservation, which may permit other events to proceed sooner. - r.Cancel() - return ctx.Err() - } -} - -// Delay is shorthand for DelayN(time.Now(), 1). -func (lim *Limiter) Delay() time.Duration { - return lim.DelayN(time.Now(), 1) -} - -// DelayN returns the delay to wait to permit n events to happen. Zero duration -// means act immediately. InfDuration means the limiter cannot grant the tokens -// requested within the maximum wait time. -func (lim *Limiter) DelayN(now time.Time, n int) time.Duration { - r := lim.reserveN(now, n, InfDuration) - return r.DelayFrom(now) -} - -// SetLimit is shorthand for SetLimitAt(time.Now(), newLimit). -func (lim *Limiter) SetLimit(newLimit Limit) { - lim.SetLimitAt(time.Now(), newLimit) -} - -// SetLimitAt sets a new Limit for the limiter. The new Limit, and Burst, may be violated -// or underutilized by those which reserved (using Reserve or Wait) but did not yet act -// before SetLimitAt was called. -func (lim *Limiter) SetLimitAt(now time.Time, newLimit Limit) { - lim.mu.Lock() - defer lim.mu.Unlock() - - now, _, tokens := lim.advance(now) - - lim.last = now - lim.tokens = tokens - lim.limit = newLimit -} - -// reserveN is a helper method for AllowN, ReserveN, and WaitN. -// maxFutureReserve specifies the maximum reservation wait duration allowed. -// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN. -func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation { - lim.mu.Lock() - - if lim.limit == Inf { - lim.mu.Unlock() - return Reservation{ - ok: true, - lim: lim, - tokens: n, - timeToAct: now, + if l.sleepFn != nil { + l.sleepFn(d) + } else { + time.Sleep(d) } } - - now, last, tokens := lim.advance(now) - - // Calculate the remaining number of tokens resulting from the request. - tokens -= float64(n) - - // Calculate the wait duration - var waitDuration time.Duration - if tokens < 0 { - waitDuration = lim.limit.durationFromTokens(-tokens) - } - - // Decide result - ok := n <= lim.burst && waitDuration <= maxFutureReserve - - // Prepare reservation - r := Reservation{ - ok: ok, - lim: lim, - limit: lim.limit, - } - if ok { - r.tokens = n - r.timeToAct = now.Add(waitDuration) - } - - // Update state - if ok { - lim.last = now - lim.tokens = tokens - lim.lastEvent = r.timeToAct - } else { - lim.last = last - } - - lim.mu.Unlock() - return r } -// advance calculates and returns an updated state for lim resulting from the passage of time. -// lim is not changed. -func (lim *Limiter) advance( - now time.Time, -) (newNow time.Time, newLast time.Time, newTokens float64) { - last := lim.last - if now.Before(last) { - last = now - } - - // Avoid making delta overflow below when last is very old. - maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens) - elapsed := now.Sub(last) - if elapsed > maxElapsed { - elapsed = maxElapsed - } - - // Calculate the new number of tokens, due to time that passed. - delta := lim.limit.tokensFromDuration(elapsed) - tokens := lim.tokens + delta - if burst := float64(lim.burst); tokens > burst { - tokens = burst - } - - return now, last, tokens +// Remove removes tokens for an operation that bypassed any waiting; it can put +// the token bucket into debt, delaying future operations. +func (l *Limiter) Remove(n float64) { + l.mu.Lock() + defer l.mu.Unlock() + l.mu.tb.Adjust(-tokenbucket.Tokens(n)) } -// durationFromTokens is a unit conversion function from the number of tokens to the duration -// of time it takes to accumulate them at a rate of limit tokens per second. -func (limit Limit) durationFromTokens(tokens float64) time.Duration { - seconds := tokens / float64(limit) - return time.Nanosecond * time.Duration(1e9*seconds) +// Rate returns the current rate limit. +func (l *Limiter) Rate() float64 { + l.mu.Lock() + defer l.mu.Unlock() + return l.mu.rate } -// tokensFromDuration is a unit conversion function from a time duration to the number of tokens -// which could be accumulated during that duration at a rate of limit tokens per second. -func (limit Limit) tokensFromDuration(d time.Duration) float64 { - return d.Seconds() * float64(limit) +// SetRate updates the rate limit. +func (l *Limiter) SetRate(r float64) { + l.mu.Lock() + defer l.mu.Unlock() + l.mu.tb.UpdateConfig(tokenbucket.TokensPerSecond(r), tokenbucket.Tokens(l.mu.burst)) + l.mu.rate = r } diff --git a/internal/rate/rate_test.go b/internal/rate/rate_test.go deleted file mode 100644 index fc49233eab..0000000000 --- a/internal/rate/rate_test.go +++ /dev/null @@ -1,452 +0,0 @@ -// Copyright 2015 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -//go:build go1.7 -// +build go1.7 - -package rate - -import ( - "context" - "math" - "runtime" - "sync" - "sync/atomic" - "testing" - "time" -) - -func TestLimit(t *testing.T) { - if Limit(10) == Inf { - t.Errorf("Limit(10) == Inf should be false") - } -} - -func closeEnough(a, b Limit) bool { - return (math.Abs(float64(a)/float64(b)) - 1.0) < 1e-9 -} - -func TestEvery(t *testing.T) { - cases := []struct { - interval time.Duration - lim Limit - }{ - {0, Inf}, - {-1, Inf}, - {1 * time.Nanosecond, Limit(1e9)}, - {1 * time.Microsecond, Limit(1e6)}, - {1 * time.Millisecond, Limit(1e3)}, - {10 * time.Millisecond, Limit(100)}, - {100 * time.Millisecond, Limit(10)}, - {1 * time.Second, Limit(1)}, - {2 * time.Second, Limit(0.5)}, - {time.Duration(2.5 * float64(time.Second)), Limit(0.4)}, - {4 * time.Second, Limit(0.25)}, - {10 * time.Second, Limit(0.1)}, - {time.Duration(math.MaxInt64), Limit(1e9 / float64(math.MaxInt64))}, - } - for _, tc := range cases { - lim := Every(tc.interval) - if !closeEnough(lim, tc.lim) { - t.Errorf("Every(%v) = %v want %v", tc.interval, lim, tc.lim) - } - } -} - -const ( - d = 100 * time.Millisecond -) - -var ( - t0 = time.Now() - t1 = t0.Add(time.Duration(1) * d) - t2 = t0.Add(time.Duration(2) * d) - t3 = t0.Add(time.Duration(3) * d) - t4 = t0.Add(time.Duration(4) * d) - t5 = t0.Add(time.Duration(5) * d) - t9 = t0.Add(time.Duration(9) * d) -) - -type allow struct { - t time.Time - n int - ok bool -} - -func run(t *testing.T, lim *Limiter, allows []allow) { - for i, allow := range allows { - ok := lim.AllowN(allow.t, allow.n) - if ok != allow.ok { - t.Errorf("step %d: lim.AllowN(%v, %v) = %v want %v", - i, allow.t, allow.n, ok, allow.ok) - } - } -} - -func TestLimiterBurst1(t *testing.T) { - run(t, NewLimiter(10, 1), []allow{ - {t0, 1, true}, - {t0, 1, false}, - {t0, 1, false}, - {t1, 1, true}, - {t1, 1, false}, - {t1, 1, false}, - {t2, 2, false}, // burst size is 1, so n=2 always fails - {t2, 1, true}, - {t2, 1, false}, - }) -} - -func TestLimiterBurst3(t *testing.T) { - run(t, NewLimiter(10, 3), []allow{ - {t0, 2, true}, - {t0, 2, false}, - {t0, 1, true}, - {t0, 1, false}, - {t1, 4, false}, - {t2, 1, true}, - {t3, 1, true}, - {t4, 1, true}, - {t4, 1, true}, - {t4, 1, false}, - {t4, 1, false}, - {t9, 3, true}, - {t9, 0, true}, - }) -} - -func TestLimiterJumpBackwards(t *testing.T) { - run(t, NewLimiter(10, 3), []allow{ - {t1, 1, true}, // start at t1 - {t0, 1, true}, // jump back to t0, two tokens remain - {t0, 1, true}, - {t0, 1, false}, - {t0, 1, false}, - {t1, 1, true}, // got a token - {t1, 1, false}, - {t1, 1, false}, - {t2, 1, true}, // got another token - {t2, 1, false}, - {t2, 1, false}, - }) -} - -func TestSimultaneousRequests(t *testing.T) { - const ( - limit = 1 - burst = 5 - numRequests = 15 - ) - var ( - wg sync.WaitGroup - numOK = uint32(0) - ) - - // Very slow replenishing bucket. - lim := NewLimiter(limit, burst) - - // Tries to take a token, atomically updates the counter and decreases the wait - // group counter. - f := func() { - defer wg.Done() - if ok := lim.Allow(); ok { - atomic.AddUint32(&numOK, 1) - } - } - - wg.Add(numRequests) - for i := 0; i < numRequests; i++ { - go f() - } - wg.Wait() - if numOK != burst { - t.Errorf("numOK = %d, want %d", numOK, burst) - } -} - -func TestLongRunningQPS(t *testing.T) { - if testing.Short() { - t.Skip("skipping in short mode") - } - - // The test runs for a few seconds executing many requests and then checks - // that overall number of requests is reasonable. - const ( - limit = 100 - burst = 100 - ) - var numOK = int32(0) - - lim := NewLimiter(limit, burst) - - // We simulate time advancing below to remove any timing related dependency - // that can cause the test to flake. - start := time.Now() - elapsed := 5 * time.Second - increment := 2 * time.Millisecond - count := int(elapsed / increment) - - for i := 0; i <= count; i++ { - if ok := lim.AllowN(start.Add(time.Duration(i)*2*time.Millisecond), 1); ok { - atomic.AddInt32(&numOK, 1) - } - } - ideal := burst + (limit * float64(elapsed) / float64(time.Second)) - - // We should never get more requests than allowed. - if want := int32(ideal + 1); numOK > want { - t.Errorf("numOK = %d, want %d (ideal %f)", numOK, want, ideal) - } - // We should get very close to the number of requests allowed. - if want := int32(0.99 * ideal); numOK < want { - t.Errorf("numOK = %d, want %d (ideal %f)", numOK, want, ideal) - } -} - -type request struct { - t time.Time - n int - act time.Time - ok bool -} - -// dFromDuration converts a duration to a multiple of the global constant d -func dFromDuration(dur time.Duration) int { - // Adding a millisecond to be swallowed by the integer division - // because we don't care about small inaccuracies - return int((dur + time.Millisecond) / d) -} - -// dSince returns multiples of d since t0 -func dSince(t time.Time) int { - return dFromDuration(t.Sub(t0)) -} - -func runReserve(t *testing.T, lim *Limiter, req request) *Reservation { - return runReserveMax(t, lim, req, InfDuration) -} - -func runReserveMax(t *testing.T, lim *Limiter, req request, maxReserve time.Duration) *Reservation { - r := lim.reserveN(req.t, req.n, maxReserve) - if r.ok && (dSince(r.timeToAct) != dSince(req.act)) || r.ok != req.ok { - t.Errorf("lim.reserveN(t%d, %v, %v) = (t%d, %v) want (t%d, %v)", - dSince(req.t), req.n, maxReserve, dSince(r.timeToAct), r.ok, dSince(req.act), req.ok) - } - return &r -} - -func TestSimpleReserve(t *testing.T) { - lim := NewLimiter(10, 2) - - runReserve(t, lim, request{t0, 2, t0, true}) - runReserve(t, lim, request{t0, 2, t2, true}) - runReserve(t, lim, request{t3, 2, t4, true}) -} - -func TestMix(t *testing.T) { - lim := NewLimiter(10, 2) - - runReserve(t, lim, request{t0, 3, t1, false}) // should return false because n > Burst - runReserve(t, lim, request{t0, 2, t0, true}) - run(t, lim, []allow{{t1, 2, false}}) // not enought tokens - don't allow - runReserve(t, lim, request{t1, 2, t2, true}) - run(t, lim, []allow{{t1, 1, false}}) // negative tokens - don't allow - run(t, lim, []allow{{t3, 1, true}}) -} - -func TestCancelInvalid(t *testing.T) { - lim := NewLimiter(10, 2) - - runReserve(t, lim, request{t0, 2, t0, true}) - r := runReserve(t, lim, request{t0, 3, t3, false}) - r.CancelAt(t0) // should have no effect - runReserve(t, lim, request{t0, 2, t2, true}) // did not get extra tokens -} - -func TestCancelLast(t *testing.T) { - lim := NewLimiter(10, 2) - - runReserve(t, lim, request{t0, 2, t0, true}) - r := runReserve(t, lim, request{t0, 2, t2, true}) - r.CancelAt(t1) // got 2 tokens back - runReserve(t, lim, request{t1, 2, t2, true}) -} - -func TestCancelTooLate(t *testing.T) { - lim := NewLimiter(10, 2) - - runReserve(t, lim, request{t0, 2, t0, true}) - r := runReserve(t, lim, request{t0, 2, t2, true}) - r.CancelAt(t3) // too late to cancel - should have no effect - runReserve(t, lim, request{t3, 2, t4, true}) -} - -func TestCancel0Tokens(t *testing.T) { - lim := NewLimiter(10, 2) - - runReserve(t, lim, request{t0, 2, t0, true}) - r := runReserve(t, lim, request{t0, 1, t1, true}) - runReserve(t, lim, request{t0, 1, t2, true}) - r.CancelAt(t0) // got 0 tokens back - runReserve(t, lim, request{t0, 1, t3, true}) -} - -func TestCancel1Token(t *testing.T) { - lim := NewLimiter(10, 2) - - runReserve(t, lim, request{t0, 2, t0, true}) - r := runReserve(t, lim, request{t0, 2, t2, true}) - runReserve(t, lim, request{t0, 1, t3, true}) - r.CancelAt(t2) // got 1 token back - runReserve(t, lim, request{t2, 2, t4, true}) -} - -func TestCancelMulti(t *testing.T) { - lim := NewLimiter(10, 4) - - runReserve(t, lim, request{t0, 4, t0, true}) - rA := runReserve(t, lim, request{t0, 3, t3, true}) - runReserve(t, lim, request{t0, 1, t4, true}) - rC := runReserve(t, lim, request{t0, 1, t5, true}) - rC.CancelAt(t1) // get 1 token back - rA.CancelAt(t1) // get 2 tokens back, as if C was never reserved - runReserve(t, lim, request{t1, 3, t5, true}) -} - -func TestReserveJumpBack(t *testing.T) { - lim := NewLimiter(10, 2) - - runReserve(t, lim, request{t1, 2, t1, true}) // start at t1 - runReserve(t, lim, request{t0, 1, t1, true}) // should violate Limit,Burst - runReserve(t, lim, request{t2, 2, t3, true}) -} - -func TestReserveJumpBackCancel(t *testing.T) { - lim := NewLimiter(10, 2) - - runReserve(t, lim, request{t1, 2, t1, true}) // start at t1 - r := runReserve(t, lim, request{t1, 2, t3, true}) - runReserve(t, lim, request{t1, 1, t4, true}) - r.CancelAt(t0) // cancel at t0, get 1 token back - runReserve(t, lim, request{t1, 2, t4, true}) // should violate Limit,Burst -} - -func TestReserveSetLimit(t *testing.T) { - lim := NewLimiter(5, 2) - - runReserve(t, lim, request{t0, 2, t0, true}) - runReserve(t, lim, request{t0, 2, t4, true}) - lim.SetLimitAt(t2, 10) - runReserve(t, lim, request{t2, 1, t4, true}) // violates Limit and Burst -} - -func TestReserveSetLimitCancel(t *testing.T) { - lim := NewLimiter(5, 2) - - runReserve(t, lim, request{t0, 2, t0, true}) - r := runReserve(t, lim, request{t0, 2, t4, true}) - lim.SetLimitAt(t2, 10) - r.CancelAt(t2) // 2 tokens back - runReserve(t, lim, request{t2, 2, t3, true}) -} - -func TestReserveMax(t *testing.T) { - lim := NewLimiter(10, 2) - maxT := d - - runReserveMax(t, lim, request{t0, 2, t0, true}, maxT) - runReserveMax(t, lim, request{t0, 1, t1, true}, maxT) // reserve for close future - runReserveMax(t, lim, request{t0, 1, t2, false}, maxT) // time to act too far in the future -} - -type wait struct { - name string - ctx context.Context - n int - delay int // in multiples of d - nilErr bool -} - -func runWait(t *testing.T, lim *Limiter, w wait) { - start := time.Now() - err := lim.WaitN(w.ctx, w.n) - delay := time.Since(start) - if (w.nilErr && err != nil) || (!w.nilErr && err == nil) || w.delay != dFromDuration(delay) { - errString := "" - if !w.nilErr { - errString = "" - } - t.Errorf("lim.WaitN(%v, lim, %v) = %v with delay %v ; want %v with delay %v", - w.name, w.n, err, delay, errString, d*time.Duration(w.delay)) - } -} - -func TestWaitSimple(t *testing.T) { - switch runtime.GOOS { - case "windows", "darwin": - t.Skip("flaky; see #1256") - } - - lim := NewLimiter(10, 3) - - ctx, cancel := context.WithCancel(context.Background()) - cancel() - runWait(t, lim, wait{"already-cancelled", ctx, 1, 0, false}) - - runWait(t, lim, wait{"exceed-burst-error", context.Background(), 4, 0, false}) - - runWait(t, lim, wait{"act-now", context.Background(), 2, 0, true}) - runWait(t, lim, wait{"act-later", context.Background(), 3, 2, true}) -} - -func TestWaitCancel(t *testing.T) { - switch runtime.GOOS { - case "windows", "darwin": - t.Skip("flaky; see #1187") - } - - lim := NewLimiter(10, 3) - - ctx, cancel := context.WithCancel(context.Background()) - runWait(t, lim, wait{"act-now", ctx, 2, 0, true}) // after this lim.tokens = 1 - go func() { - time.Sleep(d) - cancel() - }() - runWait(t, lim, wait{"will-cancel", ctx, 3, 1, false}) - // should get 3 tokens back, and have lim.tokens = 2 - t.Logf("tokens:%v last:%v lastEvent:%v", lim.tokens, lim.last, lim.lastEvent) - runWait(t, lim, wait{"act-now-after-cancel", context.Background(), 2, 0, true}) -} - -func TestWaitTimeout(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("flaky on Windows") - } - - lim := NewLimiter(10, 3) - - ctx, cancel := context.WithTimeout(context.Background(), d) - defer cancel() - runWait(t, lim, wait{"act-now", ctx, 2, 0, true}) - runWait(t, lim, wait{"w-timeout-err", ctx, 3, 0, false}) -} - -func TestWaitInf(t *testing.T) { - lim := NewLimiter(Inf, 0) - - runWait(t, lim, wait{"exceed-burst-no-error", context.Background(), 3, 0, true}) -} - -func BenchmarkAllowN(b *testing.B) { - lim := NewLimiter(Every(1*time.Second), 1) - now := time.Now() - b.ReportAllocs() - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - lim.AllowN(now, 1) - } - }) -} diff --git a/open.go b/open.go index c82b2d2bad..34494199c6 100644 --- a/open.go +++ b/open.go @@ -209,9 +209,9 @@ func Open(dirname string, opts *Options) (db *DB, _ error) { apply: d.commitApply, write: d.commitWrite, }) - d.deletionLimiter = rate.NewLimiter( - rate.Limit(d.opts.TargetByteDeletionRate), - d.opts.TargetByteDeletionRate) + if r := d.opts.TargetByteDeletionRate; r != 0 { + d.deletionLimiter = rate.NewLimiter(float64(r), float64(r)) + } d.mu.nextJobID = 1 d.mu.mem.nextSize = opts.MemTableSize if d.mu.mem.nextSize > initialMemTableSize { diff --git a/pacer.go b/pacer.go index 5cbba2bc6a..46c2986e4c 100644 --- a/pacer.go +++ b/pacer.go @@ -7,27 +7,12 @@ package pebble import ( "time" - "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/rate" ) -var nilPacer = &noopPacer{} - -type limiter interface { - DelayN(now time.Time, n int) time.Duration - AllowN(now time.Time, n int) bool - Burst() int -} - -// pacer is the interface for flush and compaction rate limiters. The rate limiter -// is possible applied on each iteration step of a flush or compaction. This is to -// limit background IO usage so that it does not contend with foreground traffic. -type pacer interface { - maybeThrottle(bytesIterated uint64) error -} - // deletionPacerInfo contains any info from the db necessary to make deletion -// pacing decisions. +// pacing decisions (to limit background IO usage so that it does not contend +// with foreground traffic). type deletionPacerInfo struct { freeBytes uint64 obsoleteBytes uint64 @@ -40,17 +25,19 @@ type deletionPacerInfo struct { // negatively impacted if too many blocks are deleted very quickly, so this // mechanism helps mitigate that. type deletionPacer struct { - limiter limiter + limiter *rate.Limiter freeSpaceThreshold uint64 obsoleteBytesMaxRatio float64 getInfo func() deletionPacerInfo + + testingSleepFn func(delay time.Duration) } // newDeletionPacer instantiates a new deletionPacer for use when deleting // obsolete files. The limiter passed in must be a singleton shared across this // pebble instance. -func newDeletionPacer(limiter limiter, getInfo func() deletionPacerInfo) *deletionPacer { +func newDeletionPacer(limiter *rate.Limiter, getInfo func() deletionPacerInfo) *deletionPacer { return &deletionPacer{ limiter: limiter, // If there are less than freeSpaceThreshold bytes of free space on @@ -67,7 +54,7 @@ func newDeletionPacer(limiter limiter, getInfo func() deletionPacerInfo) *deleti // limit applies rate limiting if the current free disk space is more than // freeSpaceThreshold, and the ratio of obsolete to live bytes is less than // obsoleteBytesMaxRatio. -func (p *deletionPacer) limit(amount uint64, info deletionPacerInfo) error { +func (p *deletionPacer) limit(amount uint64, info deletionPacerInfo) { obsoleteBytesRatio := float64(1.0) if info.liveBytes > 0 { obsoleteBytesRatio = float64(info.obsoleteBytes) / float64(info.liveBytes) @@ -75,44 +62,14 @@ func (p *deletionPacer) limit(amount uint64, info deletionPacerInfo) error { paceDeletions := info.freeBytes > p.freeSpaceThreshold && obsoleteBytesRatio < p.obsoleteBytesMaxRatio if paceDeletions { - burst := p.limiter.Burst() - for amount > uint64(burst) { - d := p.limiter.DelayN(time.Now(), burst) - if d == rate.InfDuration { - return errors.Errorf("pacing failed") - } - time.Sleep(d) - amount -= uint64(burst) - } - d := p.limiter.DelayN(time.Now(), int(amount)) - if d == rate.InfDuration { - return errors.Errorf("pacing failed") - } - time.Sleep(d) + p.limiter.Wait(float64(amount)) } else { - burst := p.limiter.Burst() - for amount > uint64(burst) { - // AllowN will subtract burst if there are enough tokens available, - // else leave the tokens untouched. That is, we are making a - // best-effort to account for this activity in the limiter, but by - // ignoring the return value, we do the activity instantaneously - // anyway. - p.limiter.AllowN(time.Now(), burst) - amount -= uint64(burst) - } - p.limiter.AllowN(time.Now(), int(amount)) + p.limiter.Remove(float64(amount)) } - return nil } // maybeThrottle slows down a deletion of this file if it's faster than // opts.TargetByteDeletionRate. -func (p *deletionPacer) maybeThrottle(bytesToDelete uint64) error { - return p.limit(bytesToDelete, p.getInfo()) -} - -type noopPacer struct{} - -func (p *noopPacer) maybeThrottle(_ uint64) error { - return nil +func (p *deletionPacer) maybeThrottle(bytesToDelete uint64) { + p.limit(bytesToDelete, p.getInfo()) } diff --git a/pacer_test.go b/pacer_test.go index ec6a0ac55d..54de5bb430 100644 --- a/pacer_test.go +++ b/pacer_test.go @@ -13,92 +13,92 @@ import ( "time" "github.com/cockroachdb/datadriven" + "github.com/cockroachdb/pebble/internal/rate" ) -type mockPrintLimiter struct { - buf bytes.Buffer - burst int -} - -func (m *mockPrintLimiter) DelayN(now time.Time, n int) time.Duration { - fmt.Fprintf(&m.buf, "wait: %d\n", n) - return 0 -} - -func (m *mockPrintLimiter) AllowN(now time.Time, n int) bool { - fmt.Fprintf(&m.buf, "allow: %d\n", n) - return true -} +func TestCompactionPacerMaybeThrottle(t *testing.T) { + now := time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC) + var buf bytes.Buffer + nowFn := func() time.Time { + return now + } + sleepFn := func(d time.Duration) { + fmt.Fprintf(&buf, "wait: %s", d) + now = now.Add(d) + } -func (m *mockPrintLimiter) Burst() int { - return m.burst -} + var pacer *deletionPacer -func TestCompactionPacerMaybeThrottle(t *testing.T) { datadriven.RunTest(t, "testdata/compaction_pacer_maybe_throttle", func(t *testing.T, d *datadriven.TestData) string { + buf.Reset() switch d.Cmd { case "init": - if len(d.CmdArgs) != 1 { - return fmt.Sprintf("%s expects 1 argument", d.Cmd) - } - burst := uint64(1) - var bytesIterated uint64 var slowdownThreshold uint64 var freeBytes, liveBytes, obsoleteBytes uint64 - if len(d.Input) > 0 { - for _, data := range strings.Split(d.Input, "\n") { - parts := strings.Split(data, ":") - if len(parts) != 2 { - return fmt.Sprintf("malformed test:\n%s", d.Input) - } - varKey := parts[0] - varValue, err := strconv.ParseUint(strings.TrimSpace(parts[1]), 10, 64) - if err != nil { - return err.Error() - } + for _, data := range strings.Split(d.Input, "\n") { + parts := strings.Split(data, ":") + if len(parts) != 2 { + return fmt.Sprintf("malformed test:\n%s", d.Input) + } + varKey := parts[0] + varValue, err := strconv.ParseUint(strings.TrimSpace(parts[1]), 10, 64) + if err != nil { + return err.Error() + } - switch varKey { - case "burst": - burst = varValue - case "bytesIterated": - bytesIterated = varValue - case "slowdownThreshold": - slowdownThreshold = varValue - case "freeBytes": - freeBytes = varValue - case "liveBytes": - liveBytes = varValue - case "obsoleteBytes": - obsoleteBytes = varValue - default: - return fmt.Sprintf("unknown command: %s", varKey) - } + switch varKey { + case "burst": + burst = varValue + case "slowdownThreshold": + slowdownThreshold = varValue + case "freeBytes": + freeBytes = varValue + case "liveBytes": + liveBytes = varValue + case "obsoleteBytes": + obsoleteBytes = varValue + default: + return fmt.Sprintf("unknown argument: %s", varKey) } } - mockLimiter := mockPrintLimiter{burst: int(burst)} - switch d.CmdArgs[0].Key { - case "deletion": - getInfo := func() deletionPacerInfo { - return deletionPacerInfo{ - freeBytes: freeBytes, - liveBytes: liveBytes, - obsoleteBytes: obsoleteBytes, - } + getInfo := func() deletionPacerInfo { + return deletionPacerInfo{ + freeBytes: freeBytes, + liveBytes: liveBytes, + obsoleteBytes: obsoleteBytes, } - deletionPacer := newDeletionPacer(&mockLimiter, getInfo) - deletionPacer.freeSpaceThreshold = slowdownThreshold - err := deletionPacer.maybeThrottle(bytesIterated) + } + mockLimiter := rate.NewLimiterWithCustomTime(float64(burst), float64(burst), nowFn, sleepFn) + pacer = newDeletionPacer(mockLimiter, getInfo) + pacer.testingSleepFn = sleepFn + pacer.freeSpaceThreshold = slowdownThreshold + return "" + + case "delete": + var bytesToDelete uint64 + for _, data := range strings.Split(d.Input, "\n") { + parts := strings.Split(data, ":") + if len(parts) != 2 { + return fmt.Sprintf("malformed test:\n%s", d.Input) + } + varKey := parts[0] + varValue, err := strconv.ParseUint(strings.TrimSpace(parts[1]), 10, 64) if err != nil { return err.Error() } - return mockLimiter.buf.String() - default: - return fmt.Sprintf("unknown command: %s", d.Cmd) + switch varKey { + case "bytesToDelete": + bytesToDelete = varValue + default: + return fmt.Sprintf("unknown command: %s", varKey) + } } + pacer.maybeThrottle(bytesToDelete) + return buf.String() default: return fmt.Sprintf("unknown command: %s", d.Cmd) diff --git a/testdata/compaction_pacer_maybe_throttle b/testdata/compaction_pacer_maybe_throttle index 2f7ffe4795..d749197e3f 100644 --- a/testdata/compaction_pacer_maybe_throttle +++ b/testdata/compaction_pacer_maybe_throttle @@ -1,72 +1,98 @@ -init deletion +init burst: 10 -bytesIterated: 5 slowdownThreshold: 10 freeBytes: 100 obsoleteBytes: 1 liveBytes: 100 ---- -wait: 5 + +delete +bytesToDelete: 1 +---- + +delete +bytesToDelete: 1 +---- # As freeBytes > slowdownThreshold and obsoleteBytesRatio < 0.20, -# all 50 bytes should be asked to wait. +# the deletions should be throttled. init deletion burst: 10 -bytesIterated: 50 slowdownThreshold: 10 freeBytes: 100 obsoleteBytes: 1 liveBytes: 100 ---- -wait: 10 -wait: 10 -wait: 10 -wait: 10 -wait: 10 -# As freeBytes < slowdownThreshold, all 50 bytes should be allowed through. +delete +bytesToDelete: 10 +---- + +delete +bytesToDelete: 5 +---- +wait: 500ms + +delete +bytesToDelete: 50 +---- +wait: 1s + +delete +bytesToDelete: 10 +---- +wait: 5s + +# As freeBytes < slowdownThreshold, there should be no throttling. init deletion burst: 10 -bytesIterated: 50 slowdownThreshold: 10 freeBytes: 5 obsoleteBytes: 1 liveBytes: 100 ---- -allow: 10 -allow: 10 -allow: 10 -allow: 10 -allow: 10 -# As obsoleteBytesRatio > 0.20, all 50 bytes should be allowed through. +delete +bytesToDelete: 50 +---- + +delete +bytesToDelete: 50 +---- + + +# As obsoleteBytesRatio > 0.20, there should be no throttling. init deletion burst: 10 -bytesIterated: 50 slowdownThreshold: 10 freeBytes: 500 obsoleteBytes: 50 liveBytes: 100 ---- -allow: 10 -allow: 10 -allow: 10 -allow: 10 -allow: 10 -# When obsolete ratio unknown, all 50 bytes should be allowed through. +delete +bytesToDelete: 50 +---- + +delete +bytesToDelete: 50 +---- + +# When obsolete ratio unknown, there should be no throttling. init deletion burst: 10 -bytesIterated: 50 slowdownThreshold: 10 freeBytes: 500 ---- -allow: 10 -allow: 10 -allow: 10 -allow: 10 -allow: 10 + +delete +bytesToDelete: 50 +---- + +delete +bytesToDelete: 50 +----