From 6ef3bb1b842b34c141e189b9c7e4ab7d77b5de93 Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Wed, 28 Jun 2023 14:31:07 -0700 Subject: [PATCH] db: improve delete pacing Delete pacing is currently an on or off decision. If we are running out of space or have too many obsolete bytes in relation to live bytes, we disable pacing. Otherwise we pace at the configured rate (128MB by default in CRDB). This change improves pacing by keeping track of the average deletion rate over the last 5 minutes and increasing the target rate to match this rate if necessary. The intention is to avoid deletions lagging behind. Informs #2662. --- compaction.go | 13 +++--- db.go | 2 +- open.go | 9 ++-- pacer.go | 115 +++++++++++++++++++++++++++++++++++++++++++++++--- pacer_test.go | 74 +++++++++++++++++++++++++++++++- 5 files changed, 196 insertions(+), 17 deletions(-) diff --git a/compaction.go b/compaction.go index b18095d8ca..16bac21266 100644 --- a/compaction.go +++ b/compaction.go @@ -3547,6 +3547,13 @@ func (d *DB) doDeleteObsoleteFiles(jobID int) { } } if len(filesToDelete) > 0 { + var tablesSize uint64 + for i := range filesToDelete { + if filesToDelete[i].fileType == fileTypeTable { + tablesSize += filesToDelete[i].fileSize + } + } + d.deletionPacer.reportDeletion(tablesSize) d.deleters.Add(1) // Delete asynchronously if that could get held up in the pacer. if d.opts.Experimental.MinDeletionRate > 0 { @@ -3561,15 +3568,11 @@ func (d *DB) doDeleteObsoleteFiles(jobID int) { // must NOT be held when calling this method. func (d *DB) paceAndDeleteObsoleteFiles(jobID int, files []obsoleteFile) { defer d.deleters.Done() - pacer := (pacer)(nilPacer) - if d.opts.Experimental.MinDeletionRate > 0 { - pacer = newDeletionPacer(d.deletionLimiter, d.getDeletionPacerInfo) - } for _, of := range files { path := base.MakeFilepath(d.opts.FS, of.dir, of.fileType, of.fileNum) if of.fileType == fileTypeTable { - _ = pacer.maybeThrottle(of.fileSize) + _ = d.deletionPacer.maybeThrottle(of.fileSize) d.mu.Lock() d.mu.versions.metrics.Table.ObsoleteCount-- d.mu.versions.metrics.Table.ObsoleteSize -= of.fileSize diff --git a/db.go b/db.go index d572245719..9ec2d8ba4e 100644 --- a/db.go +++ b/db.go @@ -297,7 +297,7 @@ type DB struct { closed *atomic.Value closedCh chan struct{} - deletionLimiter limiter + deletionPacer pacer // Async deletion jobs spawned by cleaners increment this WaitGroup, and // call Done when completed. Once `d.mu.cleaning` is false, the db.Close() diff --git a/open.go b/open.go index 58bfa53341..b1ad596af1 100644 --- a/open.go +++ b/open.go @@ -23,7 +23,6 @@ import ( "github.com/cockroachdb/pebble/internal/invariants" "github.com/cockroachdb/pebble/internal/manifest" "github.com/cockroachdb/pebble/internal/manual" - "github.com/cockroachdb/pebble/internal/rate" "github.com/cockroachdb/pebble/objstorage" "github.com/cockroachdb/pebble/record" "github.com/cockroachdb/pebble/vfs" @@ -208,9 +207,11 @@ func Open(dirname string, opts *Options) (db *DB, _ error) { apply: d.commitApply, write: d.commitWrite, }) - d.deletionLimiter = rate.NewLimiter( - rate.Limit(d.opts.Experimental.MinDeletionRate), - d.opts.Experimental.MinDeletionRate) + if d.opts.Experimental.MinDeletionRate > 0 { + d.deletionPacer = newDeletionPacer(int64(d.opts.Experimental.MinDeletionRate), d.getDeletionPacerInfo) + } else { + d.deletionPacer = nilPacer + } d.mu.nextJobID = 1 d.mu.mem.nextSize = opts.MemTableSize if d.mu.mem.nextSize > initialMemTableSize { diff --git a/pacer.go b/pacer.go index 8d3d17f857..7546e56cbf 100644 --- a/pacer.go +++ b/pacer.go @@ -5,6 +5,7 @@ package pebble import ( + "sync" "time" "github.com/cockroachdb/errors" @@ -24,6 +25,8 @@ type limiter interface { // limit background IO usage so that it does not contend with foreground traffic. type pacer interface { maybeThrottle(bytesIterated uint64) error + + reportDeletion(bytesToDelete uint64) } // deletionPacerInfo contains any info from the db necessary to make deletion @@ -40,19 +43,33 @@ type deletionPacerInfo struct { // negatively impacted if too many blocks are deleted very quickly, so this // mechanism helps mitigate that. type deletionPacer struct { - limiter limiter - freeSpaceThreshold uint64 - obsoleteBytesMaxRatio float64 + limiter limiter + freeSpaceThreshold uint64 + obsoleteBytesMaxRatio float64 + targetByteDeletionRate int64 getInfo func() deletionPacerInfo + + mu struct { + sync.Mutex + + // history keeps rack of recent deletion history; it used to increase the + // deletion rate to match the pace of deletions. + history history + } } +const deletePacerHistory = 5 * time.Minute + // newDeletionPacer instantiates a new deletionPacer for use when deleting // obsolete files. The limiter passed in must be a singleton shared across this // pebble instance. -func newDeletionPacer(limiter limiter, getInfo func() deletionPacerInfo) *deletionPacer { - return &deletionPacer{ - limiter: limiter, +func newDeletionPacer( + targetByteDeletionRate int64, getInfo func() deletionPacerInfo, +) *deletionPacer { + d := &deletionPacer{ + limiter: rate.NewLimiter(rate.Limit(targetByteDeletionRate), int(targetByteDeletionRate)), + // If there are less than freeSpaceThreshold bytes of free space on // disk, do not pace deletions at all. freeSpaceThreshold: 16 << 30, // 16 GB @@ -60,8 +77,12 @@ func newDeletionPacer(limiter limiter, getInfo func() deletionPacerInfo) *deleti // obsoleteBytesMaxRatio, do not pace deletions at all. obsoleteBytesMaxRatio: 0.20, + targetByteDeletionRate: targetByteDeletionRate, + getInfo: getInfo, } + d.mu.history.Init(time.Now(), deletePacerHistory) + return d } // limit applies rate limiting if the current free disk space is more than @@ -74,6 +95,16 @@ func (p *deletionPacer) limit(amount uint64, info deletionPacerInfo) error { } paceDeletions := info.freeBytes > p.freeSpaceThreshold && obsoleteBytesRatio < p.obsoleteBytesMaxRatio + + p.mu.Lock() + if historyRate := p.mu.history.Sum(time.Now()) / int64(deletePacerHistory/time.Second); historyRate > p.targetByteDeletionRate { + // Deletions have been happening at a rate higher than the target rate; we + // want to speed up deletions so they match the historic rate. We do this by + // decreasing the amount accordingly. + amount = uint64(float64(p.targetByteDeletionRate) / float64(historyRate) * float64(amount)) + } + p.mu.Unlock() + if paceDeletions { burst := p.limiter.Burst() for amount > uint64(burst) { @@ -111,8 +142,80 @@ func (p *deletionPacer) maybeThrottle(bytesToDelete uint64) error { return p.limit(bytesToDelete, p.getInfo()) } +// reportDeletion is used to report a deletion to the pacer. The pacer uses it +// to keep track of the recent rate of deletions and potentially increase the +// deletion rate accordingly. +// +// reportDeletion is thread-safe. +func (p *deletionPacer) reportDeletion(bytesToDelete uint64) { + p.mu.Lock() + defer p.mu.Unlock() + p.mu.history.Add(time.Now(), int64(bytesToDelete)) +} + type noopPacer struct{} func (p *noopPacer) maybeThrottle(_ uint64) error { return nil } + +func (p *noopPacer) reportDeletion(_ uint64) {} + +// history is a helper used to keep track of the recent history of a set of +// data points (in our case deleted bytes), at limited granularity. +// Specifically, we split the desired timeframe into 100 "epochs" and all times +// are effectively rounded down to the nearest epoch boundary. +type history struct { + epochDuration time.Duration + startTime time.Time + // currEpoch is the epoch of the most recent operation. + currEpoch int64 + // val contains the recent epoch values. + // val[currEpoch % historyEpochs] is the current epoch. + // val[(currEpoch + 1) % historyEpochs] is the oldest epoch. + val [historyEpochs]int64 + // sum is always equal to the sum of values in val. + sum int64 +} + +const historyEpochs = 100 + +// Init the history helper to keep track of data over the given number of +// seconds. +func (h *history) Init(now time.Time, timeframe time.Duration) { + *h = history{ + epochDuration: timeframe / time.Duration(historyEpochs), + startTime: now, + currEpoch: 0, + sum: 0, + } +} + +// Add adds a value for the current time. +func (h *history) Add(now time.Time, val int64) { + h.advance(now) + h.val[h.currEpoch%historyEpochs] += val + h.sum += val +} + +// Sum returns the sum of recent values. The result is approximate in that the +// cut-off time is within 1% of the exact one. +func (h *history) Sum(now time.Time) int64 { + h.advance(now) + return h.sum +} + +func (h *history) epoch(t time.Time) int64 { + return int64(t.Sub(h.startTime) / h.epochDuration) +} + +// advance advances the time to the given time. +func (h *history) advance(now time.Time) { + epoch := h.epoch(now) + for h.currEpoch < epoch { + h.currEpoch++ + // Forget the data for the oldest epoch. + h.sum -= h.val[h.currEpoch%historyEpochs] + h.val[h.currEpoch%historyEpochs] = 0 + } +} diff --git a/pacer_test.go b/pacer_test.go index ec6a0ac55d..3d267ada60 100644 --- a/pacer_test.go +++ b/pacer_test.go @@ -7,6 +7,8 @@ package pebble import ( "bytes" "fmt" + "math/rand" + "sort" "strconv" "strings" "testing" @@ -88,7 +90,8 @@ func TestCompactionPacerMaybeThrottle(t *testing.T) { obsoleteBytes: obsoleteBytes, } } - deletionPacer := newDeletionPacer(&mockLimiter, getInfo) + deletionPacer := newDeletionPacer(100, getInfo) + deletionPacer.limiter = &mockLimiter deletionPacer.freeSpaceThreshold = slowdownThreshold err := deletionPacer.maybeThrottle(bytesIterated) if err != nil { @@ -105,3 +108,72 @@ func TestCompactionPacerMaybeThrottle(t *testing.T) { } }) } + +// TestDeletionPacerHistory tests the history helper by crosschecking Sum() +// against a naive implementation. +func TestDeletionPacerHistory(t *testing.T) { + type event struct { + time time.Time + // If report is 0, this event is a Sum(). Otherwise it is an Add(). + report int64 + } + numEvents := 1 + rand.Intn(200) + timeframe := time.Duration(1+rand.Intn(60*100)) * time.Second + events := make([]event, numEvents) + startTime := time.Now() + for i := range events { + events[i].time = startTime.Add(time.Duration(rand.Int63n(int64(timeframe)))) + if rand.Intn(3) == 0 { + events[i].report = 0 + } else { + events[i].report = int64(rand.Intn(100000)) + } + } + sort.Slice(events, func(i, j int) bool { + return events[i].time.Before(events[j].time) + }) + + var h history + h.Init(startTime, timeframe) + + // partialSums[i] := SUM_j= 0; j-- { + if events[j].time.Before(cutoff) { + return j + } + } + return -1 + } + + // Sum all report values in the last timeframe, and see if recent events + // (allowing 1% error in the cutoff time) match the result. + a := getIdx(e.time.Add(-timeframe * (historyEpochs + 1) / historyEpochs)) + b := getIdx(e.time.Add(-timeframe * (historyEpochs - 1) / historyEpochs)) + found := false + for j := a; j <= b; j++ { + if partialSums[i+1]-partialSums[j+1] == result { + found = true + break + } + } + if !found { + t.Fatalf("incorrect Sum() result %d; %v", result, events[a+1:i+1]) + } + } +}