diff --git a/compaction.go b/compaction.go index b18095d8ca..16bac21266 100644 --- a/compaction.go +++ b/compaction.go @@ -3547,6 +3547,13 @@ func (d *DB) doDeleteObsoleteFiles(jobID int) { } } if len(filesToDelete) > 0 { + var tablesSize uint64 + for i := range filesToDelete { + if filesToDelete[i].fileType == fileTypeTable { + tablesSize += filesToDelete[i].fileSize + } + } + d.deletionPacer.reportDeletion(tablesSize) d.deleters.Add(1) // Delete asynchronously if that could get held up in the pacer. if d.opts.Experimental.MinDeletionRate > 0 { @@ -3561,15 +3568,11 @@ func (d *DB) doDeleteObsoleteFiles(jobID int) { // must NOT be held when calling this method. func (d *DB) paceAndDeleteObsoleteFiles(jobID int, files []obsoleteFile) { defer d.deleters.Done() - pacer := (pacer)(nilPacer) - if d.opts.Experimental.MinDeletionRate > 0 { - pacer = newDeletionPacer(d.deletionLimiter, d.getDeletionPacerInfo) - } for _, of := range files { path := base.MakeFilepath(d.opts.FS, of.dir, of.fileType, of.fileNum) if of.fileType == fileTypeTable { - _ = pacer.maybeThrottle(of.fileSize) + _ = d.deletionPacer.maybeThrottle(of.fileSize) d.mu.Lock() d.mu.versions.metrics.Table.ObsoleteCount-- d.mu.versions.metrics.Table.ObsoleteSize -= of.fileSize diff --git a/db.go b/db.go index d572245719..9ec2d8ba4e 100644 --- a/db.go +++ b/db.go @@ -297,7 +297,7 @@ type DB struct { closed *atomic.Value closedCh chan struct{} - deletionLimiter limiter + deletionPacer pacer // Async deletion jobs spawned by cleaners increment this WaitGroup, and // call Done when completed. Once `d.mu.cleaning` is false, the db.Close() diff --git a/open.go b/open.go index 58bfa53341..b1ad596af1 100644 --- a/open.go +++ b/open.go @@ -23,7 +23,6 @@ import ( "github.com/cockroachdb/pebble/internal/invariants" "github.com/cockroachdb/pebble/internal/manifest" "github.com/cockroachdb/pebble/internal/manual" - "github.com/cockroachdb/pebble/internal/rate" "github.com/cockroachdb/pebble/objstorage" "github.com/cockroachdb/pebble/record" "github.com/cockroachdb/pebble/vfs" @@ -208,9 +207,11 @@ func Open(dirname string, opts *Options) (db *DB, _ error) { apply: d.commitApply, write: d.commitWrite, }) - d.deletionLimiter = rate.NewLimiter( - rate.Limit(d.opts.Experimental.MinDeletionRate), - d.opts.Experimental.MinDeletionRate) + if d.opts.Experimental.MinDeletionRate > 0 { + d.deletionPacer = newDeletionPacer(int64(d.opts.Experimental.MinDeletionRate), d.getDeletionPacerInfo) + } else { + d.deletionPacer = nilPacer + } d.mu.nextJobID = 1 d.mu.mem.nextSize = opts.MemTableSize if d.mu.mem.nextSize > initialMemTableSize { diff --git a/pacer.go b/pacer.go index 8d3d17f857..7546e56cbf 100644 --- a/pacer.go +++ b/pacer.go @@ -5,6 +5,7 @@ package pebble import ( + "sync" "time" "github.com/cockroachdb/errors" @@ -24,6 +25,8 @@ type limiter interface { // limit background IO usage so that it does not contend with foreground traffic. type pacer interface { maybeThrottle(bytesIterated uint64) error + + reportDeletion(bytesToDelete uint64) } // deletionPacerInfo contains any info from the db necessary to make deletion @@ -40,19 +43,33 @@ type deletionPacerInfo struct { // negatively impacted if too many blocks are deleted very quickly, so this // mechanism helps mitigate that. type deletionPacer struct { - limiter limiter - freeSpaceThreshold uint64 - obsoleteBytesMaxRatio float64 + limiter limiter + freeSpaceThreshold uint64 + obsoleteBytesMaxRatio float64 + targetByteDeletionRate int64 getInfo func() deletionPacerInfo + + mu struct { + sync.Mutex + + // history keeps rack of recent deletion history; it used to increase the + // deletion rate to match the pace of deletions. + history history + } } +const deletePacerHistory = 5 * time.Minute + // newDeletionPacer instantiates a new deletionPacer for use when deleting // obsolete files. The limiter passed in must be a singleton shared across this // pebble instance. -func newDeletionPacer(limiter limiter, getInfo func() deletionPacerInfo) *deletionPacer { - return &deletionPacer{ - limiter: limiter, +func newDeletionPacer( + targetByteDeletionRate int64, getInfo func() deletionPacerInfo, +) *deletionPacer { + d := &deletionPacer{ + limiter: rate.NewLimiter(rate.Limit(targetByteDeletionRate), int(targetByteDeletionRate)), + // If there are less than freeSpaceThreshold bytes of free space on // disk, do not pace deletions at all. freeSpaceThreshold: 16 << 30, // 16 GB @@ -60,8 +77,12 @@ func newDeletionPacer(limiter limiter, getInfo func() deletionPacerInfo) *deleti // obsoleteBytesMaxRatio, do not pace deletions at all. obsoleteBytesMaxRatio: 0.20, + targetByteDeletionRate: targetByteDeletionRate, + getInfo: getInfo, } + d.mu.history.Init(time.Now(), deletePacerHistory) + return d } // limit applies rate limiting if the current free disk space is more than @@ -74,6 +95,16 @@ func (p *deletionPacer) limit(amount uint64, info deletionPacerInfo) error { } paceDeletions := info.freeBytes > p.freeSpaceThreshold && obsoleteBytesRatio < p.obsoleteBytesMaxRatio + + p.mu.Lock() + if historyRate := p.mu.history.Sum(time.Now()) / int64(deletePacerHistory/time.Second); historyRate > p.targetByteDeletionRate { + // Deletions have been happening at a rate higher than the target rate; we + // want to speed up deletions so they match the historic rate. We do this by + // decreasing the amount accordingly. + amount = uint64(float64(p.targetByteDeletionRate) / float64(historyRate) * float64(amount)) + } + p.mu.Unlock() + if paceDeletions { burst := p.limiter.Burst() for amount > uint64(burst) { @@ -111,8 +142,80 @@ func (p *deletionPacer) maybeThrottle(bytesToDelete uint64) error { return p.limit(bytesToDelete, p.getInfo()) } +// reportDeletion is used to report a deletion to the pacer. The pacer uses it +// to keep track of the recent rate of deletions and potentially increase the +// deletion rate accordingly. +// +// reportDeletion is thread-safe. +func (p *deletionPacer) reportDeletion(bytesToDelete uint64) { + p.mu.Lock() + defer p.mu.Unlock() + p.mu.history.Add(time.Now(), int64(bytesToDelete)) +} + type noopPacer struct{} func (p *noopPacer) maybeThrottle(_ uint64) error { return nil } + +func (p *noopPacer) reportDeletion(_ uint64) {} + +// history is a helper used to keep track of the recent history of a set of +// data points (in our case deleted bytes), at limited granularity. +// Specifically, we split the desired timeframe into 100 "epochs" and all times +// are effectively rounded down to the nearest epoch boundary. +type history struct { + epochDuration time.Duration + startTime time.Time + // currEpoch is the epoch of the most recent operation. + currEpoch int64 + // val contains the recent epoch values. + // val[currEpoch % historyEpochs] is the current epoch. + // val[(currEpoch + 1) % historyEpochs] is the oldest epoch. + val [historyEpochs]int64 + // sum is always equal to the sum of values in val. + sum int64 +} + +const historyEpochs = 100 + +// Init the history helper to keep track of data over the given number of +// seconds. +func (h *history) Init(now time.Time, timeframe time.Duration) { + *h = history{ + epochDuration: timeframe / time.Duration(historyEpochs), + startTime: now, + currEpoch: 0, + sum: 0, + } +} + +// Add adds a value for the current time. +func (h *history) Add(now time.Time, val int64) { + h.advance(now) + h.val[h.currEpoch%historyEpochs] += val + h.sum += val +} + +// Sum returns the sum of recent values. The result is approximate in that the +// cut-off time is within 1% of the exact one. +func (h *history) Sum(now time.Time) int64 { + h.advance(now) + return h.sum +} + +func (h *history) epoch(t time.Time) int64 { + return int64(t.Sub(h.startTime) / h.epochDuration) +} + +// advance advances the time to the given time. +func (h *history) advance(now time.Time) { + epoch := h.epoch(now) + for h.currEpoch < epoch { + h.currEpoch++ + // Forget the data for the oldest epoch. + h.sum -= h.val[h.currEpoch%historyEpochs] + h.val[h.currEpoch%historyEpochs] = 0 + } +} diff --git a/pacer_test.go b/pacer_test.go index ec6a0ac55d..3d267ada60 100644 --- a/pacer_test.go +++ b/pacer_test.go @@ -7,6 +7,8 @@ package pebble import ( "bytes" "fmt" + "math/rand" + "sort" "strconv" "strings" "testing" @@ -88,7 +90,8 @@ func TestCompactionPacerMaybeThrottle(t *testing.T) { obsoleteBytes: obsoleteBytes, } } - deletionPacer := newDeletionPacer(&mockLimiter, getInfo) + deletionPacer := newDeletionPacer(100, getInfo) + deletionPacer.limiter = &mockLimiter deletionPacer.freeSpaceThreshold = slowdownThreshold err := deletionPacer.maybeThrottle(bytesIterated) if err != nil { @@ -105,3 +108,72 @@ func TestCompactionPacerMaybeThrottle(t *testing.T) { } }) } + +// TestDeletionPacerHistory tests the history helper by crosschecking Sum() +// against a naive implementation. +func TestDeletionPacerHistory(t *testing.T) { + type event struct { + time time.Time + // If report is 0, this event is a Sum(). Otherwise it is an Add(). + report int64 + } + numEvents := 1 + rand.Intn(200) + timeframe := time.Duration(1+rand.Intn(60*100)) * time.Second + events := make([]event, numEvents) + startTime := time.Now() + for i := range events { + events[i].time = startTime.Add(time.Duration(rand.Int63n(int64(timeframe)))) + if rand.Intn(3) == 0 { + events[i].report = 0 + } else { + events[i].report = int64(rand.Intn(100000)) + } + } + sort.Slice(events, func(i, j int) bool { + return events[i].time.Before(events[j].time) + }) + + var h history + h.Init(startTime, timeframe) + + // partialSums[i] := SUM_j= 0; j-- { + if events[j].time.Before(cutoff) { + return j + } + } + return -1 + } + + // Sum all report values in the last timeframe, and see if recent events + // (allowing 1% error in the cutoff time) match the result. + a := getIdx(e.time.Add(-timeframe * (historyEpochs + 1) / historyEpochs)) + b := getIdx(e.time.Add(-timeframe * (historyEpochs - 1) / historyEpochs)) + found := false + for j := a; j <= b; j++ { + if partialSums[i+1]-partialSums[j+1] == result { + found = true + break + } + } + if !found { + t.Fatalf("incorrect Sum() result %d; %v", result, events[a+1:i+1]) + } + } +}