diff --git a/cleaner.go b/cleaner.go index 03557ce3ee..8cdd97ee3d 100644 --- a/cleaner.go +++ b/cleaner.go @@ -74,7 +74,7 @@ func openCleanupManager( opts: opts, objProvider: objProvider, onTableDeleteFn: onTableDeleteFn, - deletePacer: newDeletionPacer(getDeletePacerInfo), + deletePacer: newDeletionPacer(time.Now(), int64(opts.TargetByteDeletionRate), getDeletePacerInfo), jobsCh: make(chan *cleanupJob, jobsChLen), } cm.mu.completedJobsCond.L = &cm.mu.Mutex @@ -102,6 +102,21 @@ func (cm *cleanupManager) EnqueueJob(jobID int, obsoleteFiles []obsoleteFile) { jobID: jobID, obsoleteFiles: obsoleteFiles, } + + // Report deleted bytes to the pacer, which can use this data to potentially + // increase the deletion rate to keep up. We want to do this at enqueue time + // rather than when we get to the job, otherwise the reported bytes will be + // subject to the throttling rate which defeats the purpose. + var pacingBytes uint64 + for _, of := range obsoleteFiles { + if cm.needsPacing(of.fileType, of.fileNum) { + pacingBytes += of.fileSize + } + } + if pacingBytes > 0 { + cm.deletePacer.ReportDeletion(time.Now(), pacingBytes) + } + cm.mu.Lock() cm.mu.queuedJobs++ cm.mu.Unlock() @@ -130,23 +145,17 @@ func (cm *cleanupManager) Wait() { // mainLoop runs the manager's background goroutine. func (cm *cleanupManager) mainLoop() { defer cm.waitGroup.Done() - useLimiter := false - var limiter tokenbucket.TokenBucket - - if r := cm.opts.TargetByteDeletionRate; r != 0 { - useLimiter = true - limiter.Init(tokenbucket.TokensPerSecond(r), tokenbucket.Tokens(r)) - } + var tb tokenbucket.TokenBucket + // Use a token bucket with 1 token / second refill rate and 1 token burst. + tb.Init(1.0, 1.0) for job := range cm.jobsCh { for _, of := range job.obsoleteFiles { if of.fileType != fileTypeTable { path := base.MakeFilepath(cm.opts.FS, of.dir, of.fileType, of.fileNum) cm.deleteObsoleteFile(of.fileType, job.jobID, path, of.fileNum, of.fileSize) } else { - if useLimiter { - cm.maybePace(&limiter, of.fileType, of.fileNum, of.fileSize) - } + cm.maybePace(&tb, of.fileType, of.fileNum, of.fileSize) cm.onTableDeleteFn(of.fileSize) cm.deleteObsoleteObject(fileTypeTable, job.jobID, of.fileNum) } @@ -158,33 +167,40 @@ func (cm *cleanupManager) mainLoop() { } } -// maybePace sleeps before deleting an object if appropriate. It is always -// called from the background goroutine. -func (cm *cleanupManager) maybePace( - limiter *tokenbucket.TokenBucket, - fileType base.FileType, - fileNum base.DiskFileNum, - fileSize uint64, -) { +func (cm *cleanupManager) needsPacing(fileType base.FileType, fileNum base.DiskFileNum) bool { + if fileType != fileTypeTable { + return false + } meta, err := cm.objProvider.Lookup(fileType, fileNum) if err != nil { // The object was already removed from the provider; we won't actually // delete anything, so we don't need to pace. - return + return false } - if meta.IsShared() { - // Don't throttle deletion of shared objects. + // Don't throttle deletion of shared objects. + return !meta.IsShared() +} + +// maybePace sleeps before deleting an object if appropriate. It is always +// called from the background goroutine. +func (cm *cleanupManager) maybePace( + tb *tokenbucket.TokenBucket, fileType base.FileType, fileNum base.DiskFileNum, fileSize uint64, +) { + if !cm.needsPacing(fileType, fileNum) { return } - if !cm.deletePacer.shouldPace() { - // The deletion pacer decided that we shouldn't throttle; account - // for the operation but don't wait for tokens. - limiter.Adjust(-tokenbucket.Tokens(fileSize)) + + tokens := cm.deletePacer.PacingDelay(time.Now(), fileSize) + if tokens == 0.0 { + // The token bucket might be in debt; it could make us wait even for 0 + // tokens. We don't want that if the pacer decided throttling should be + // disabled. return } - // Wait for tokens. + // Wait for tokens. We use a token bucket instead of sleeping outright because + // the token bucket accumulates up to one second of unused tokens. for { - ok, d := limiter.TryToFulfill(tokenbucket.Tokens(fileSize)) + ok, d := tb.TryToFulfill(tokenbucket.Tokens(tokens)) if ok { break } diff --git a/pacer.go b/pacer.go index 80c9f1229f..c1f78f0109 100644 --- a/pacer.go +++ b/pacer.go @@ -4,6 +4,11 @@ package pebble +import ( + "sync" + "time" +) + // deletionPacerInfo contains any info from the db necessary to make deletion // pacing decisions (to limit background IO usage so that it does not contend // with foreground traffic). @@ -19,36 +24,150 @@ type deletionPacerInfo struct { // negatively impacted if too many blocks are deleted very quickly, so this // mechanism helps mitigate that. type deletionPacer struct { - freeSpaceThreshold uint64 + // If there are less than freeSpaceThreshold bytes of free space on + // disk, do not pace deletions at all. + freeSpaceThreshold uint64 + + // If the ratio of obsolete bytes to live bytes is greater than + // obsoleteBytesMaxRatio, do not pace deletions at all. obsoleteBytesMaxRatio float64 + mu struct { + sync.Mutex + + // history keeps rack of recent deletion history; it used to increase the + // deletion rate to match the pace of deletions. + history history + } + + targetByteDeletionRate int64 + getInfo func() deletionPacerInfo } +const deletePacerHistory = 5 * time.Minute + // newDeletionPacer instantiates a new deletionPacer for use when deleting -// obsolete files. The limiter passed in must be a singleton shared across this -// pebble instance. -func newDeletionPacer(getInfo func() deletionPacerInfo) *deletionPacer { - return &deletionPacer{ - // If there are less than freeSpaceThreshold bytes of free space on - // disk, do not pace deletions at all. - freeSpaceThreshold: 16 << 30, // 16 GB - // If the ratio of obsolete bytes to live bytes is greater than - // obsoleteBytesMaxRatio, do not pace deletions at all. +// obsolete files. +// +// targetByteDeletionRate is the rate (in bytes/sec) at which we want to +// normally limit deletes (when we are not falling behind or running out of +// space). A value of 0.0 disables pacing. +func newDeletionPacer( + now time.Time, targetByteDeletionRate int64, getInfo func() deletionPacerInfo, +) *deletionPacer { + d := &deletionPacer{ + freeSpaceThreshold: 16 << 30, // 16 GB obsoleteBytesMaxRatio: 0.20, - getInfo: getInfo, + targetByteDeletionRate: targetByteDeletionRate, + getInfo: getInfo, } + d.mu.history.Init(now, deletePacerHistory) + return d +} + +// ReportDeletion is used to report a deletion to the pacer. The pacer uses it +// to keep track of the recent rate of deletions and potentially increase the +// deletion rate accordingly. +// +// ReportDeletion is thread-safe. +func (p *deletionPacer) ReportDeletion(now time.Time, bytesToDelete uint64) { + p.mu.Lock() + defer p.mu.Unlock() + p.mu.history.Add(now, int64(bytesToDelete)) } -// shouldPace returns true if we should apply rate limiting; this is the -// case when the current free disk space is more than freeSpaceThreshold, and -// the ratio of obsolete to live bytes is less than obsoleteBytesMaxRatio. -func (p *deletionPacer) shouldPace() bool { +// PacingDelay returns the recommended pacing wait time (in seconds) for +// deleting the given number of bytes. +// +// PacingDelay is thread-safe. +func (p *deletionPacer) PacingDelay(now time.Time, bytesToDelete uint64) (waitSeconds float64) { + if p.targetByteDeletionRate == 0 { + // Pacing disabled. + return 0.0 + } + info := p.getInfo() - obsoleteBytesRatio := float64(1.0) + if info.freeBytes <= p.freeSpaceThreshold { + return 0.0 + } + obsoleteBytesRatio := 1.0 if info.liveBytes > 0 { obsoleteBytesRatio = float64(info.obsoleteBytes) / float64(info.liveBytes) } - return info.freeBytes > p.freeSpaceThreshold && obsoleteBytesRatio < p.obsoleteBytesMaxRatio + if obsoleteBytesRatio >= p.obsoleteBytesMaxRatio { + return 0.0 + } + + rate := p.targetByteDeletionRate + + // See if recent deletion rate is more than our target; if so, use that as our + // target so that we don't fall behind. + p.mu.Lock() + defer p.mu.Unlock() + if historyRate := p.mu.history.Sum(now) / int64(deletePacerHistory/time.Second); rate < historyRate { + rate = historyRate + } + + return float64(bytesToDelete) / float64(rate) +} + +// history is a helper used to keep track of the recent history of a set of +// data points (in our case deleted bytes), at limited granularity. +// Specifically, we split the desired timeframe into 100 "epochs" and all times +// are effectively rounded down to the nearest epoch boundary. +type history struct { + epochDuration time.Duration + startTime time.Time + // currEpoch is the epoch of the most recent operation. + currEpoch int64 + // val contains the recent epoch values. + // val[currEpoch % historyEpochs] is the current epoch. + // val[(currEpoch + 1) % historyEpochs] is the oldest epoch. + val [historyEpochs]int64 + // sum is always equal to the sum of values in val. + sum int64 +} + +const historyEpochs = 100 + +// Init the history helper to keep track of data over the given number of +// seconds. +func (h *history) Init(now time.Time, timeframe time.Duration) { + *h = history{ + epochDuration: timeframe / time.Duration(historyEpochs), + startTime: now, + currEpoch: 0, + sum: 0, + } +} + +// Add adds a value for the current time. +func (h *history) Add(now time.Time, val int64) { + h.advance(now) + h.val[h.currEpoch%historyEpochs] += val + h.sum += val +} + +// Sum returns the sum of recent values. The result is approximate in that the +// cut-off time is within 1% of the exact one. +func (h *history) Sum(now time.Time) int64 { + h.advance(now) + return h.sum +} + +func (h *history) epoch(t time.Time) int64 { + return int64(t.Sub(h.startTime) / h.epochDuration) +} + +// advance advances the time to the given time. +func (h *history) advance(now time.Time) { + epoch := h.epoch(now) + for h.currEpoch < epoch { + h.currEpoch++ + // Forget the data for the oldest epoch. + h.sum -= h.val[h.currEpoch%historyEpochs] + h.val[h.currEpoch%historyEpochs] = 0 + } } diff --git a/pacer_test.go b/pacer_test.go index 7c94d4db47..666ac2c2a6 100644 --- a/pacer_test.go +++ b/pacer_test.go @@ -6,49 +6,78 @@ package pebble import ( "fmt" + "math/rand" + "sort" "testing" + "time" "github.com/stretchr/testify/require" ) -func TestDeletionPacerMaybeThrottle(t *testing.T) { +func TestDeletionPacer(t *testing.T) { + const MB = 1 << 20 + const GB = 1 << 30 testCases := []struct { - freeSpaceThreshold uint64 - freeBytes uint64 - obsoleteBytes uint64 - liveBytes uint64 - shouldPace bool + freeBytes uint64 + obsoleteBytes uint64 + liveBytes uint64 + // history of deletion reporting; first value in the pair is the time, + // second value is the deleted bytes. The time of pacing is the same as the + // last time in the history. + history [][2]int + // expected wait time for 100 MB. + expected float64 }{ { - freeSpaceThreshold: 10, - freeBytes: 100, - obsoleteBytes: 1, - liveBytes: 100, - shouldPace: true, + freeBytes: 160 * GB, + obsoleteBytes: 1 * MB, + liveBytes: 160 * MB, + expected: 1.0, }, - // As freeBytes < freeSpaceThreshold, there should be no throttling. + // As freeBytes < free space threshold, there should be no throttling. { - freeSpaceThreshold: 10, - freeBytes: 5, - obsoleteBytes: 1, - liveBytes: 100, - shouldPace: false, + freeBytes: 5 * GB, + obsoleteBytes: 1 * MB, + liveBytes: 100 * MB, + expected: 0.0, }, // As obsoleteBytesRatio > 0.20, there should be no throttling. { - freeSpaceThreshold: 10, - freeBytes: 500, - obsoleteBytes: 50, - liveBytes: 100, - shouldPace: false, + freeBytes: 500 * GB, + obsoleteBytes: 50 * MB, + liveBytes: 100 * MB, + expected: 0.0, }, // When obsolete ratio unknown, there should be no throttling. { - freeSpaceThreshold: 10, - freeBytes: 500, - obsoleteBytes: 0, - liveBytes: 0, - shouldPace: false, + freeBytes: 500 * GB, + obsoleteBytes: 0, + liveBytes: 0, + expected: 0.0, + }, + // History shows 200MB/sec deletions on average over last 5 minutes, wait + // time should be half. + { + freeBytes: 160 * GB, + obsoleteBytes: 1 * MB, + liveBytes: 160 * MB, + history: [][2]int{{0, 5 * 60 * 200 * MB}}, + expected: 0.5, + }, + { + freeBytes: 160 * GB, + obsoleteBytes: 1 * MB, + liveBytes: 160 * MB, + history: [][2]int{{0, 60 * 1000 * MB}, {3 * 60, 60 * 4 * 1000 * MB}, {4 * 60, 0}}, + expected: 0.1, + }, + // First entry in history is too old, it should be discarded. + { + freeBytes: 160 * GB, + obsoleteBytes: 1 * MB, + liveBytes: 160 * MB, + history: [][2]int{{0, 10 * 60 * 10000 * MB}, {3 * 60, 4 * 60 * 200 * MB}, {7 * 60, 1 * 60 * 200 * MB}}, + expected: 0.5, }, } for tcIdx, tc := range testCases { @@ -60,10 +89,84 @@ func TestDeletionPacerMaybeThrottle(t *testing.T) { obsoleteBytes: tc.obsoleteBytes, } } - pacer := newDeletionPacer(getInfo) - pacer.freeSpaceThreshold = tc.freeSpaceThreshold - result := pacer.shouldPace() - require.Equal(t, tc.shouldPace, result) + start := time.Now() + last := start + pacer := newDeletionPacer(start, 100*MB, getInfo) + for _, h := range tc.history { + last = start.Add(time.Second * time.Duration(h[0])) + pacer.ReportDeletion(last, uint64(h[1])) + } + result := pacer.PacingDelay(last, 100*MB) + require.InDelta(t, tc.expected, result, 1e-7) }) } } + +// TestDeletionPacerHistory tests the history helper by crosschecking Sum() +// against a naive implementation. +func TestDeletionPacerHistory(t *testing.T) { + type event struct { + time time.Time + // If report is 0, this event is a Sum(). Otherwise it is an Add(). + report int64 + } + numEvents := 1 + rand.Intn(200) + timeframe := time.Duration(1+rand.Intn(60*100)) * time.Second + events := make([]event, numEvents) + startTime := time.Now() + for i := range events { + events[i].time = startTime.Add(time.Duration(rand.Int63n(int64(timeframe)))) + if rand.Intn(3) == 0 { + events[i].report = 0 + } else { + events[i].report = int64(rand.Intn(100000)) + } + } + sort.Slice(events, func(i, j int) bool { + return events[i].time.Before(events[j].time) + }) + + var h history + h.Init(startTime, timeframe) + + // partialSums[i] := SUM_j= 0; j-- { + if events[j].time.Before(cutoff) { + return j + } + } + return -1 + } + + // Sum all report values in the last timeframe, and see if recent events + // (allowing 1% error in the cutoff time) match the result. + a := getIdx(e.time.Add(-timeframe * (historyEpochs + 1) / historyEpochs)) + b := getIdx(e.time.Add(-timeframe * (historyEpochs - 1) / historyEpochs)) + found := false + for j := a; j <= b; j++ { + if partialSums[i+1]-partialSums[j+1] == result { + found = true + break + } + } + if !found { + t.Fatalf("incorrect Sum() result %d; %v", result, events[a+1:i+1]) + } + } +}