Skip to content

Commit

Permalink
db: improve delete pacing
Browse files Browse the repository at this point in the history
Delete pacing is currently an on or off decision. If we are running
out of space or have too many obsolete bytes in relation to live
bytes, we disable pacing. Otherwise we pace at the configured rate
(128MB by default in CRDB).

This change improves pacing by keeping track of the average deletion
rate over the last 5 minutes and increasing the target rate to match
this rate if necessary. The intention is to avoid deletions lagging
behind.

Fixes #2662.
  • Loading branch information
RaduBerinde committed Jun 28, 2023
1 parent c807f60 commit 41563fd
Show file tree
Hide file tree
Showing 3 changed files with 314 additions and 76 deletions.
72 changes: 44 additions & 28 deletions cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func openCleanupManager(
opts: opts,
objProvider: objProvider,
onTableDeleteFn: onTableDeleteFn,
deletePacer: newDeletionPacer(getDeletePacerInfo),
deletePacer: newDeletionPacer(time.Now(), int64(opts.TargetByteDeletionRate), getDeletePacerInfo),
jobsCh: make(chan *cleanupJob, jobsChLen),
}
cm.mu.completedJobsCond.L = &cm.mu.Mutex
Expand Down Expand Up @@ -102,6 +102,21 @@ func (cm *cleanupManager) EnqueueJob(jobID int, obsoleteFiles []obsoleteFile) {
jobID: jobID,
obsoleteFiles: obsoleteFiles,
}

// Report deleted bytes to the pacer, which can use this data to potentially
// increase the deletion rate to keep up. We want to do this at enqueue time
// rather than when we get to the job, otherwise the reported bytes will be
// subject to the throttling rate which defeats the purpose.
var pacingBytes uint64
for _, of := range obsoleteFiles {
if cm.needsPacing(of.fileType, of.fileNum) {
pacingBytes += of.fileSize
}
}
if pacingBytes > 0 {
cm.deletePacer.ReportDeletion(time.Now(), pacingBytes)
}

cm.mu.Lock()
cm.mu.queuedJobs++
cm.mu.Unlock()
Expand Down Expand Up @@ -130,23 +145,17 @@ func (cm *cleanupManager) Wait() {
// mainLoop runs the manager's background goroutine.
func (cm *cleanupManager) mainLoop() {
defer cm.waitGroup.Done()
useLimiter := false
var limiter tokenbucket.TokenBucket

if r := cm.opts.TargetByteDeletionRate; r != 0 {
useLimiter = true
limiter.Init(tokenbucket.TokensPerSecond(r), tokenbucket.Tokens(r))
}

var tb tokenbucket.TokenBucket
// Use a token bucket with 1 token / second refill rate and 1 token burst.
tb.Init(1.0, 1.0)
for job := range cm.jobsCh {
for _, of := range job.obsoleteFiles {
if of.fileType != fileTypeTable {
path := base.MakeFilepath(cm.opts.FS, of.dir, of.fileType, of.fileNum)
cm.deleteObsoleteFile(of.fileType, job.jobID, path, of.fileNum, of.fileSize)
} else {
if useLimiter {
cm.maybePace(&limiter, of.fileType, of.fileNum, of.fileSize)
}
cm.maybePace(&tb, of.fileType, of.fileNum, of.fileSize)
cm.onTableDeleteFn(of.fileSize)
cm.deleteObsoleteObject(fileTypeTable, job.jobID, of.fileNum)
}
Expand All @@ -158,33 +167,40 @@ func (cm *cleanupManager) mainLoop() {
}
}

// maybePace sleeps before deleting an object if appropriate. It is always
// called from the background goroutine.
func (cm *cleanupManager) maybePace(
limiter *tokenbucket.TokenBucket,
fileType base.FileType,
fileNum base.DiskFileNum,
fileSize uint64,
) {
func (cm *cleanupManager) needsPacing(fileType base.FileType, fileNum base.DiskFileNum) bool {
if fileType != fileTypeTable {
return false
}
meta, err := cm.objProvider.Lookup(fileType, fileNum)
if err != nil {
// The object was already removed from the provider; we won't actually
// delete anything, so we don't need to pace.
return
return false
}
if meta.IsShared() {
// Don't throttle deletion of shared objects.
// Don't throttle deletion of shared objects.
return !meta.IsShared()
}

// maybePace sleeps before deleting an object if appropriate. It is always
// called from the background goroutine.
func (cm *cleanupManager) maybePace(
tb *tokenbucket.TokenBucket, fileType base.FileType, fileNum base.DiskFileNum, fileSize uint64,
) {
if !cm.needsPacing(fileType, fileNum) {
return
}
if !cm.deletePacer.shouldPace() {
// The deletion pacer decided that we shouldn't throttle; account
// for the operation but don't wait for tokens.
limiter.Adjust(-tokenbucket.Tokens(fileSize))

tokens := cm.deletePacer.PacingDelay(time.Now(), fileSize)
if tokens == 0.0 {
// The token bucket might be in debt; it could make us wait even for 0
// tokens. We don't want that if the pacer decided throttling should be
// disabled.
return
}
// Wait for tokens.
// Wait for tokens. We use a token bucket instead of sleeping outright because
// the token bucket accumulates up to one second of unused tokens.
for {
ok, d := limiter.TryToFulfill(tokenbucket.Tokens(fileSize))
ok, d := tb.TryToFulfill(tokenbucket.Tokens(tokens))
if ok {
break
}
Expand Down
153 changes: 136 additions & 17 deletions pacer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@

package pebble

import (
"sync"
"time"
)

// deletionPacerInfo contains any info from the db necessary to make deletion
// pacing decisions (to limit background IO usage so that it does not contend
// with foreground traffic).
Expand All @@ -19,36 +24,150 @@ type deletionPacerInfo struct {
// negatively impacted if too many blocks are deleted very quickly, so this
// mechanism helps mitigate that.
type deletionPacer struct {
freeSpaceThreshold uint64
// If there are less than freeSpaceThreshold bytes of free space on
// disk, do not pace deletions at all.
freeSpaceThreshold uint64

// If the ratio of obsolete bytes to live bytes is greater than
// obsoleteBytesMaxRatio, do not pace deletions at all.
obsoleteBytesMaxRatio float64

mu struct {
sync.Mutex

// history keeps rack of recent deletion history; it used to increase the
// deletion rate to match the pace of deletions.
history history
}

targetByteDeletionRate int64

getInfo func() deletionPacerInfo
}

const deletePacerHistory = 5 * time.Minute

// newDeletionPacer instantiates a new deletionPacer for use when deleting
// obsolete files. The limiter passed in must be a singleton shared across this
// pebble instance.
func newDeletionPacer(getInfo func() deletionPacerInfo) *deletionPacer {
return &deletionPacer{
// If there are less than freeSpaceThreshold bytes of free space on
// disk, do not pace deletions at all.
freeSpaceThreshold: 16 << 30, // 16 GB
// If the ratio of obsolete bytes to live bytes is greater than
// obsoleteBytesMaxRatio, do not pace deletions at all.
// obsolete files.
//
// targetByteDeletionRate is the rate (in bytes/sec) at which we want to
// normally limit deletes (when we are not falling behind or running out of
// space). A value of 0.0 disables pacing.
func newDeletionPacer(
now time.Time, targetByteDeletionRate int64, getInfo func() deletionPacerInfo,
) *deletionPacer {
d := &deletionPacer{
freeSpaceThreshold: 16 << 30, // 16 GB
obsoleteBytesMaxRatio: 0.20,

getInfo: getInfo,
targetByteDeletionRate: targetByteDeletionRate,
getInfo: getInfo,
}
d.mu.history.Init(now, deletePacerHistory)
return d
}

// ReportDeletion is used to report a deletion to the pacer. The pacer uses it
// to keep track of the recent rate of deletions and potentially increase the
// deletion rate accordingly.
//
// ReportDeletion is thread-safe.
func (p *deletionPacer) ReportDeletion(now time.Time, bytesToDelete uint64) {
p.mu.Lock()
defer p.mu.Unlock()
p.mu.history.Add(now, int64(bytesToDelete))
}

// shouldPace returns true if we should apply rate limiting; this is the
// case when the current free disk space is more than freeSpaceThreshold, and
// the ratio of obsolete to live bytes is less than obsoleteBytesMaxRatio.
func (p *deletionPacer) shouldPace() bool {
// PacingDelay returns the recommended pacing wait time (in seconds) for
// deleting the given number of bytes.
//
// PacingDelay is thread-safe.
func (p *deletionPacer) PacingDelay(now time.Time, bytesToDelete uint64) (waitSeconds float64) {
if p.targetByteDeletionRate == 0 {
// Pacing disabled.
return 0.0
}

info := p.getInfo()
obsoleteBytesRatio := float64(1.0)
if info.freeBytes <= p.freeSpaceThreshold {
return 0.0
}
obsoleteBytesRatio := 1.0
if info.liveBytes > 0 {
obsoleteBytesRatio = float64(info.obsoleteBytes) / float64(info.liveBytes)
}
return info.freeBytes > p.freeSpaceThreshold && obsoleteBytesRatio < p.obsoleteBytesMaxRatio
if obsoleteBytesRatio >= p.obsoleteBytesMaxRatio {
return 0.0
}

rate := p.targetByteDeletionRate

// See if recent deletion rate is more than our target; if so, use that as our
// target so that we don't fall behind.
p.mu.Lock()
defer p.mu.Unlock()
if historyRate := p.mu.history.Sum(now) / int64(deletePacerHistory/time.Second); rate < historyRate {
rate = historyRate
}

return float64(bytesToDelete) / float64(rate)
}

// history is a helper used to keep track of the recent history of a set of
// data points (in our case deleted bytes), at limited granularity.
// Specifically, we split the desired timeframe into 100 "epochs" and all times
// are effectively rounded down to the nearest epoch boundary.
type history struct {
epochDuration time.Duration
startTime time.Time
// currEpoch is the epoch of the most recent operation.
currEpoch int64
// val contains the recent epoch values.
// val[currEpoch % historyEpochs] is the current epoch.
// val[(currEpoch + 1) % historyEpochs] is the oldest epoch.
val [historyEpochs]int64
// sum is always equal to the sum of values in val.
sum int64
}

const historyEpochs = 100

// Init the history helper to keep track of data over the given number of
// seconds.
func (h *history) Init(now time.Time, timeframe time.Duration) {
*h = history{
epochDuration: timeframe / time.Duration(historyEpochs),
startTime: now,
currEpoch: 0,
sum: 0,
}
}

// Add adds a value for the current time.
func (h *history) Add(now time.Time, val int64) {
h.advance(now)
h.val[h.currEpoch%historyEpochs] += val
h.sum += val
}

// Sum returns the sum of recent values. The result is approximate in that the
// cut-off time is within 1% of the exact one.
func (h *history) Sum(now time.Time) int64 {
h.advance(now)
return h.sum
}

func (h *history) epoch(t time.Time) int64 {
return int64(t.Sub(h.startTime) / h.epochDuration)
}

// advance advances the time to the given time.
func (h *history) advance(now time.Time) {
epoch := h.epoch(now)
for h.currEpoch < epoch {
h.currEpoch++
// Forget the data for the oldest epoch.
h.sum -= h.val[h.currEpoch%historyEpochs]
h.val[h.currEpoch%historyEpochs] = 0
}
}
Loading

0 comments on commit 41563fd

Please sign in to comment.