Skip to content

Commit

Permalink
db: improve delete pacing
Browse files Browse the repository at this point in the history
We improve the delete pacer to be able to scale delete pacing instead
of making an on/off decision.

The new pacer uses low/high thresholds for various factors; on one
side of the threshold we pace at the target rate and on the other side
we don't throttle at all. In-between we scale the wait time per byte
linearly.

Thresholds are as follows:

|                              | Start increasing rate at | Disable pacing at |
| ---------------------------- | ------------------------ | ----------------- |
| Free space                   | 32GB                     | 16GB              |
| Obsolete to live bytes ratio | 5%                       | 20%               |
| Obsolete bytes (*new*)       | 1GB                      | 10GB              |

Fixes cockroachdb#2662.
  • Loading branch information
RaduBerinde committed Jun 22, 2023
1 parent c0b26f2 commit 8b43e7b
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 68 deletions.
32 changes: 11 additions & 21 deletions cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func openCleanupManager(
opts: opts,
objProvider: objProvider,
onTableDeleteFn: onTableDeleteFn,
deletePacer: newDeletionPacer(getDeletePacerInfo),
deletePacer: newDeletionPacer(opts.TargetByteDeletionRate, getDeletePacerInfo),
jobsCh: make(chan *cleanupJob, jobsChLen),
}
cm.mu.completedJobsCond.L = &cm.mu.Mutex
Expand Down Expand Up @@ -130,23 +130,17 @@ func (cm *cleanupManager) Wait() {
// mainLoop runs the manager's background goroutine.
func (cm *cleanupManager) mainLoop() {
defer cm.waitGroup.Done()
useLimiter := false
var limiter tokenbucket.TokenBucket

if r := cm.opts.TargetByteDeletionRate; r != 0 {
useLimiter = true
limiter.Init(tokenbucket.TokensPerSecond(r), tokenbucket.Tokens(r))
}

var tb tokenbucket.TokenBucket
// Use a token bucket with 1 token / second refill rate and 1 token burst.
tb.Init(1.0, 1.0)
for job := range cm.jobsCh {
for _, of := range job.obsoleteFiles {
if of.fileType != fileTypeTable {
path := base.MakeFilepath(cm.opts.FS, of.dir, of.fileType, of.fileNum)
cm.deleteObsoleteFile(of.fileType, job.jobID, path, of.fileNum, of.fileSize)
} else {
if useLimiter {
cm.maybePace(&limiter, of.fileType, of.fileNum, of.fileSize)
}
cm.maybePace(&tb, of.fileType, of.fileNum, of.fileSize)
cm.onTableDeleteFn(of.fileSize)
cm.deleteObsoleteObject(fileTypeTable, job.jobID, of.fileNum)
}
Expand All @@ -161,10 +155,7 @@ func (cm *cleanupManager) mainLoop() {
// maybePace sleeps before deleting an object if appropriate. It is always
// called from the background goroutine.
func (cm *cleanupManager) maybePace(
limiter *tokenbucket.TokenBucket,
fileType base.FileType,
fileNum base.DiskFileNum,
fileSize uint64,
tb *tokenbucket.TokenBucket, fileType base.FileType, fileNum base.DiskFileNum, fileSize uint64,
) {
meta, err := cm.objProvider.Lookup(fileType, fileNum)
if err != nil {
Expand All @@ -176,15 +167,14 @@ func (cm *cleanupManager) maybePace(
// Don't throttle deletion of shared objects.
return
}
if !cm.deletePacer.shouldPace() {
// The deletion pacer decided that we shouldn't throttle; account
// for the operation but don't wait for tokens.
limiter.Adjust(-tokenbucket.Tokens(fileSize))
tokens := cm.deletePacer.PacingDelay(fileSize)
if tokens == 0.0 {
return
}
// Wait for tokens.
// Wait for tokens. We use a token bucket instead of sleeping outright because
// the token bucket accumulates up to one second of unused tokens.
for {
ok, d := limiter.TryToFulfill(tokenbucket.Tokens(fileSize))
ok, d := tb.TryToFulfill(tokenbucket.Tokens(tokens))
if ok {
break
}
Expand Down
98 changes: 81 additions & 17 deletions pacer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,36 +19,100 @@ type deletionPacerInfo struct {
// negatively impacted if too many blocks are deleted very quickly, so this
// mechanism helps mitigate that.
type deletionPacer struct {
freeSpaceThreshold uint64
obsoleteBytesMaxRatio float64
// If there are less than freeSpaceThreshold bytes of free space on
// disk, do not pace deletions at all.
freeSpaceScaler linearScaler

// If the ratio of obsolete bytes to live bytes is greater than
// obsoleteBytesMaxRatio, do not pace deletions at all.
obsoleteBytesRatioScaler linearScaler
obsoleteBytesAbsoluteScaler linearScaler

targetByteDeletionRate float64

getInfo func() deletionPacerInfo
}

// newDeletionPacer instantiates a new deletionPacer for use when deleting
// obsolete files. The limiter passed in must be a singleton shared across this
// pebble instance.
func newDeletionPacer(getInfo func() deletionPacerInfo) *deletionPacer {
// obsolete files.
//
// targetByteDeletionRate is the rate (in bytes/sec) at which we want to
// normally limit deletes (when we are not falling behind or running out of
// space). A value of 0.0 disables pacing.
func newDeletionPacer(targetByteDeletionRate int, getInfo func() deletionPacerInfo) *deletionPacer {
const GB = 1 << 30
const percent = 0.01
// TODO(radu): these thresholds should be configurable.
return &deletionPacer{
// If there are less than freeSpaceThreshold bytes of free space on
// disk, do not pace deletions at all.
freeSpaceThreshold: 16 << 30, // 16 GB
// If the ratio of obsolete bytes to live bytes is greater than
// obsoleteBytesMaxRatio, do not pace deletions at all.
obsoleteBytesMaxRatio: 0.20,
// We start increasing the deletion rate when free space falls below 32B; we
// completely disable pacing when it falls below 16GB.
freeSpaceScaler: makeLinearScaler(16*GB, 32*GB),

// We start increasing the deletion rate when obsolete bytes to live bytes
// ratio goes above 5%; we completely disable pacing when it gets to 20%.
obsoleteBytesRatioScaler: makeLinearScaler(5*percent, 20*percent),

// We start increasing the deletion rate when we have 1GB obsolete bytes; we
// completely disable pacing when we have 10GB obsolete bytes.
obsoleteBytesAbsoluteScaler: makeLinearScaler(1*GB, 10*GB),

targetByteDeletionRate: float64(targetByteDeletionRate),

getInfo: getInfo,
}
}

// shouldPace returns true if we should apply rate limiting; this is the
// case when the current free disk space is more than freeSpaceThreshold, and
// the ratio of obsolete to live bytes is less than obsoleteBytesMaxRatio.
func (p *deletionPacer) shouldPace() bool {
// PacingDelay returns the recommended pacing wait time (in seconds) for
// deleting the given number of bytes.
func (p *deletionPacer) PacingDelay(bytesToDelete uint64) (waitSeconds float64) {
if p.targetByteDeletionRate == 0.0 {
// Pacing disabled.
return 0
}

info := p.getInfo()
obsoleteBytesRatio := float64(1.0)
// The scaling is opposite for free space (high value should mean scale 1.0).
scale := 1.0 - p.freeSpaceScaler.scale(float64(info.freeBytes))

if s := p.obsoleteBytesAbsoluteScaler.scale(float64(info.obsoleteBytes)); s < scale {
scale = s
}

obsoleteBytesRatio := 1.0
if info.liveBytes > 0 {
obsoleteBytesRatio = float64(info.obsoleteBytes) / float64(info.liveBytes)
}
return info.freeBytes > p.freeSpaceThreshold && obsoleteBytesRatio < p.obsoleteBytesMaxRatio
if s := p.obsoleteBytesRatioScaler.scale(obsoleteBytesRatio); s < scale {
scale = s
}
return scale * float64(bytesToDelete) / p.targetByteDeletionRate
}

// linearScaler is a helper for calculating a scaling factor based on a value
// that we want to keep below certain targets.
//
// Specifically, we have a low and a high threshold; the scaling factor is 1.0
// whenever the value is under the low threshold. Above that, the scaling factor
// decreases linearly until it reaches 0.0 at the high threshold.
type linearScaler struct {
lowThreshold float64
highThreshold float64
}

func makeLinearScaler(lowThreshold, highThreshold float64) linearScaler {
return linearScaler{
lowThreshold: lowThreshold,
highThreshold: highThreshold,
}
}

func (s linearScaler) scale(value float64) float64 {
switch {
case value <= s.lowThreshold:
return 1.0
case value >= s.highThreshold:
return 0.0
default:
return (s.highThreshold - value) / (s.highThreshold - s.lowThreshold)
}
}
78 changes: 48 additions & 30 deletions pacer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,43 +12,62 @@ import (
)

func TestDeletionPacerMaybeThrottle(t *testing.T) {
const MB = 1 << 20
const GB = 1 << 30
testCases := []struct {
freeSpaceThreshold uint64
freeBytes uint64
obsoleteBytes uint64
liveBytes uint64
shouldPace bool
freeBytes uint64
obsoleteBytes uint64
liveBytes uint64
// expected scaling of wait time.
expected float64
}{
{
freeSpaceThreshold: 10,
freeBytes: 100,
obsoleteBytes: 1,
liveBytes: 100,
shouldPace: true,
freeBytes: 160 * GB,
obsoleteBytes: 1 * MB,
liveBytes: 160 * MB,
expected: 1.0,
},
// As freeBytes < freeSpaceThreshold, there should be no throttling.
// As freeBytes < free space threshold, there should be no throttling.
{
freeSpaceThreshold: 10,
freeBytes: 5,
obsoleteBytes: 1,
liveBytes: 100,
shouldPace: false,
freeBytes: 5 * GB,
obsoleteBytes: 1 * MB,
liveBytes: 100 * MB,
expected: 0.0,
},
// As obsoleteBytesRatio > 0.20, there should be no throttling.
{
freeSpaceThreshold: 10,
freeBytes: 500,
obsoleteBytes: 50,
liveBytes: 100,
shouldPace: false,
freeBytes: 500 * GB,
obsoleteBytes: 50 * MB,
liveBytes: 100 * MB,
expected: 0.0,
},
// obsoleteBytesRatio is mid-way between 5% and 50%, the scale should be 0.5.
{
freeBytes: 500 * GB,
obsoleteBytes: 125 * MB,
liveBytes: 1000 * MB,
expected: 0.5,
},
// obsoleteBytes is 10GB, there should be no throttling.
{
freeBytes: 500 * GB,
obsoleteBytes: 10 * GB,
liveBytes: 5000 * GB,
expected: 0.0,
},
// obsoleteBytes is 5.5GB, scale should be 0.5.
{
freeBytes: 500 * GB,
obsoleteBytes: 11 * GB / 2,
liveBytes: 5000 * GB,
expected: 0.5,
},
// When obsolete ratio unknown, there should be no throttling.
{
freeSpaceThreshold: 10,
freeBytes: 500,
obsoleteBytes: 0,
liveBytes: 0,
shouldPace: false,
freeBytes: 500 * GB,
obsoleteBytes: 0,
liveBytes: 0,
expected: 0.0,
},
}
for tcIdx, tc := range testCases {
Expand All @@ -60,10 +79,9 @@ func TestDeletionPacerMaybeThrottle(t *testing.T) {
obsoleteBytes: tc.obsoleteBytes,
}
}
pacer := newDeletionPacer(getInfo)
pacer.freeSpaceThreshold = tc.freeSpaceThreshold
result := pacer.shouldPace()
require.Equal(t, tc.shouldPace, result)
pacer := newDeletionPacer(1, getInfo)
result := pacer.PacingDelay(1)
require.InDelta(t, tc.expected, result, 1e-7)
})
}
}

0 comments on commit 8b43e7b

Please sign in to comment.