Skip to content

Commit

Permalink
db: improve delete pacing
Browse files Browse the repository at this point in the history
Delete pacing is currently an on or off decision. If we are running
out of space or have too many obsolete bytes in relation to live
bytes, we disable pacing. Otherwise we pace at the configured rate
(128MB by default in CRDB).

This change improves pacing by keeping track of the average deletion
rate over the last 5 minutes and increasing the target rate to match
this rate if necessary. The intention is to avoid deletions lagging
behind.

Informs #2662.
  • Loading branch information
RaduBerinde committed Jun 30, 2023
1 parent b71662e commit c0a2ede
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 17 deletions.
13 changes: 8 additions & 5 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -3547,6 +3547,13 @@ func (d *DB) doDeleteObsoleteFiles(jobID int) {
}
}
if len(filesToDelete) > 0 {
var tablesSize uint64
for i := range filesToDelete {
if filesToDelete[i].fileType == fileTypeTable {
tablesSize += filesToDelete[i].fileSize
}
}
d.deletionPacer.reportDeletion(tablesSize)
d.deleters.Add(1)
// Delete asynchronously if that could get held up in the pacer.
if d.opts.Experimental.MinDeletionRate > 0 {
Expand All @@ -3561,15 +3568,11 @@ func (d *DB) doDeleteObsoleteFiles(jobID int) {
// must NOT be held when calling this method.
func (d *DB) paceAndDeleteObsoleteFiles(jobID int, files []obsoleteFile) {
defer d.deleters.Done()
pacer := (pacer)(nilPacer)
if d.opts.Experimental.MinDeletionRate > 0 {
pacer = newDeletionPacer(d.deletionLimiter, d.getDeletionPacerInfo)
}

for _, of := range files {
path := base.MakeFilepath(d.opts.FS, of.dir, of.fileType, of.fileNum)
if of.fileType == fileTypeTable {
_ = pacer.maybeThrottle(of.fileSize)
_ = d.deletionPacer.maybeThrottle(of.fileSize)
d.mu.Lock()
d.mu.versions.metrics.Table.ObsoleteCount--
d.mu.versions.metrics.Table.ObsoleteSize -= of.fileSize
Expand Down
2 changes: 1 addition & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ type DB struct {
closed *atomic.Value
closedCh chan struct{}

deletionLimiter limiter
deletionPacer pacer

// Async deletion jobs spawned by cleaners increment this WaitGroup, and
// call Done when completed. Once `d.mu.cleaning` is false, the db.Close()
Expand Down
9 changes: 5 additions & 4 deletions open.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/internal/manual"
"github.com/cockroachdb/pebble/internal/rate"
"github.com/cockroachdb/pebble/objstorage"
"github.com/cockroachdb/pebble/record"
"github.com/cockroachdb/pebble/vfs"
Expand Down Expand Up @@ -208,9 +207,11 @@ func Open(dirname string, opts *Options) (db *DB, _ error) {
apply: d.commitApply,
write: d.commitWrite,
})
d.deletionLimiter = rate.NewLimiter(
rate.Limit(d.opts.Experimental.MinDeletionRate),
d.opts.Experimental.MinDeletionRate)
if d.opts.Experimental.MinDeletionRate > 0 {
d.deletionPacer = newDeletionPacer(int64(d.opts.Experimental.MinDeletionRate), d.getDeletionPacerInfo)
} else {
d.deletionPacer = nilPacer
}
d.mu.nextJobID = 1
d.mu.mem.nextSize = opts.MemTableSize
if d.mu.mem.nextSize > initialMemTableSize {
Expand Down
115 changes: 109 additions & 6 deletions pacer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package pebble

import (
"sync"
"time"

"github.com/cockroachdb/errors"
Expand All @@ -24,6 +25,8 @@ type limiter interface {
// limit background IO usage so that it does not contend with foreground traffic.
type pacer interface {
maybeThrottle(bytesIterated uint64) error

reportDeletion(bytesToDelete uint64)
}

// deletionPacerInfo contains any info from the db necessary to make deletion
Expand All @@ -40,28 +43,46 @@ type deletionPacerInfo struct {
// negatively impacted if too many blocks are deleted very quickly, so this
// mechanism helps mitigate that.
type deletionPacer struct {
limiter limiter
freeSpaceThreshold uint64
obsoleteBytesMaxRatio float64
limiter limiter
freeSpaceThreshold uint64
obsoleteBytesMaxRatio float64
targetByteDeletionRate int64

getInfo func() deletionPacerInfo

mu struct {
sync.Mutex

// history keeps rack of recent deletion history; it used to increase the
// deletion rate to match the pace of deletions.
history history
}
}

const deletePacerHistory = 5 * time.Minute

// newDeletionPacer instantiates a new deletionPacer for use when deleting
// obsolete files. The limiter passed in must be a singleton shared across this
// pebble instance.
func newDeletionPacer(limiter limiter, getInfo func() deletionPacerInfo) *deletionPacer {
return &deletionPacer{
limiter: limiter,
func newDeletionPacer(
targetByteDeletionRate int64, getInfo func() deletionPacerInfo,
) *deletionPacer {
d := &deletionPacer{
limiter: rate.NewLimiter(rate.Limit(targetByteDeletionRate), int(targetByteDeletionRate)),

// If there are less than freeSpaceThreshold bytes of free space on
// disk, do not pace deletions at all.
freeSpaceThreshold: 16 << 30, // 16 GB
// If the ratio of obsolete bytes to live bytes is greater than
// obsoleteBytesMaxRatio, do not pace deletions at all.
obsoleteBytesMaxRatio: 0.20,

targetByteDeletionRate: targetByteDeletionRate,

getInfo: getInfo,
}
d.mu.history.Init(time.Now(), deletePacerHistory)
return d
}

// limit applies rate limiting if the current free disk space is more than
Expand All @@ -74,6 +95,16 @@ func (p *deletionPacer) limit(amount uint64, info deletionPacerInfo) error {
}
paceDeletions := info.freeBytes > p.freeSpaceThreshold &&
obsoleteBytesRatio < p.obsoleteBytesMaxRatio

p.mu.Lock()
if historyRate := p.mu.history.Sum(time.Now()) / int64(deletePacerHistory/time.Second); historyRate > p.targetByteDeletionRate {
// Deletions have been happening at a rate higher than the target rate; we
// want to speed up deletions so they match the historic rate. We do this by
// decreasing the amount accordingly.
amount = uint64(float64(p.targetByteDeletionRate) / float64(historyRate) * float64(amount))
}
p.mu.Unlock()

if paceDeletions {
burst := p.limiter.Burst()
for amount > uint64(burst) {
Expand Down Expand Up @@ -111,8 +142,80 @@ func (p *deletionPacer) maybeThrottle(bytesToDelete uint64) error {
return p.limit(bytesToDelete, p.getInfo())
}

// reportDeletion is used to report a deletion to the pacer. The pacer uses it
// to keep track of the recent rate of deletions and potentially increase the
// deletion rate accordingly.
//
// reportDeletion is thread-safe.
func (p *deletionPacer) reportDeletion(bytesToDelete uint64) {
p.mu.Lock()
defer p.mu.Unlock()
p.mu.history.Add(time.Now(), int64(bytesToDelete))
}

type noopPacer struct{}

func (p *noopPacer) maybeThrottle(_ uint64) error {
return nil
}

func (p *noopPacer) reportDeletion(_ uint64) {}

// history is a helper used to keep track of the recent history of a set of
// data points (in our case deleted bytes), at limited granularity.
// Specifically, we split the desired timeframe into 100 "epochs" and all times
// are effectively rounded down to the nearest epoch boundary.
type history struct {
epochDuration time.Duration
startTime time.Time
// currEpoch is the epoch of the most recent operation.
currEpoch int64
// val contains the recent epoch values.
// val[currEpoch % historyEpochs] is the current epoch.
// val[(currEpoch + 1) % historyEpochs] is the oldest epoch.
val [historyEpochs]int64
// sum is always equal to the sum of values in val.
sum int64
}

const historyEpochs = 100

// Init the history helper to keep track of data over the given number of
// seconds.
func (h *history) Init(now time.Time, timeframe time.Duration) {
*h = history{
epochDuration: timeframe / time.Duration(historyEpochs),
startTime: now,
currEpoch: 0,
sum: 0,
}
}

// Add adds a value for the current time.
func (h *history) Add(now time.Time, val int64) {
h.advance(now)
h.val[h.currEpoch%historyEpochs] += val
h.sum += val
}

// Sum returns the sum of recent values. The result is approximate in that the
// cut-off time is within 1% of the exact one.
func (h *history) Sum(now time.Time) int64 {
h.advance(now)
return h.sum
}

func (h *history) epoch(t time.Time) int64 {
return int64(t.Sub(h.startTime) / h.epochDuration)
}

// advance advances the time to the given time.
func (h *history) advance(now time.Time) {
epoch := h.epoch(now)
for h.currEpoch < epoch {
h.currEpoch++
// Forget the data for the oldest epoch.
h.sum -= h.val[h.currEpoch%historyEpochs]
h.val[h.currEpoch%historyEpochs] = 0
}
}
74 changes: 73 additions & 1 deletion pacer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package pebble
import (
"bytes"
"fmt"
"math/rand"
"sort"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -88,7 +90,8 @@ func TestCompactionPacerMaybeThrottle(t *testing.T) {
obsoleteBytes: obsoleteBytes,
}
}
deletionPacer := newDeletionPacer(&mockLimiter, getInfo)
deletionPacer := newDeletionPacer(100, getInfo)
deletionPacer.limiter = &mockLimiter
deletionPacer.freeSpaceThreshold = slowdownThreshold
err := deletionPacer.maybeThrottle(bytesIterated)
if err != nil {
Expand All @@ -105,3 +108,72 @@ func TestCompactionPacerMaybeThrottle(t *testing.T) {
}
})
}

// TestDeletionPacerHistory tests the history helper by crosschecking Sum()
// against a naive implementation.
func TestDeletionPacerHistory(t *testing.T) {
type event struct {
time time.Time
// If report is 0, this event is a Sum(). Otherwise it is an Add().
report int64
}
numEvents := 1 + rand.Intn(200)
timeframe := time.Duration(1+rand.Intn(60*100)) * time.Second
events := make([]event, numEvents)
startTime := time.Now()
for i := range events {
events[i].time = startTime.Add(time.Duration(rand.Int63n(int64(timeframe))))
if rand.Intn(3) == 0 {
events[i].report = 0
} else {
events[i].report = int64(rand.Intn(100000))
}
}
sort.Slice(events, func(i, j int) bool {
return events[i].time.Before(events[j].time)
})

var h history
h.Init(startTime, timeframe)

// partialSums[i] := SUM_j<i events[j].report
partialSums := make([]int64, len(events)+1)
for i := range events {
partialSums[i+1] = partialSums[i] + events[i].report
}

for i, e := range events {
if e.report != 0 {
h.Add(e.time, e.report)
continue
}

result := h.Sum(e.time)

// getIdx returns the largest event index <= i that is before the cutoff
// time.
getIdx := func(cutoff time.Time) int {
for j := i; j >= 0; j-- {
if events[j].time.Before(cutoff) {
return j
}
}
return -1
}

// Sum all report values in the last timeframe, and see if recent events
// (allowing 1% error in the cutoff time) match the result.
a := getIdx(e.time.Add(-timeframe * (historyEpochs + 1) / historyEpochs))
b := getIdx(e.time.Add(-timeframe * (historyEpochs - 1) / historyEpochs))
found := false
for j := a; j <= b; j++ {
if partialSums[i+1]-partialSums[j+1] == result {
found = true
break
}
}
if !found {
t.Fatalf("incorrect Sum() result %d; %v", result, events[a+1:i+1])
}
}
}

0 comments on commit c0a2ede

Please sign in to comment.