Skip to content

Commit

Permalink
Add compaction pacing mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
Ryanfsdf committed Jul 15, 2019
1 parent 649af7e commit 608a182
Show file tree
Hide file tree
Showing 10 changed files with 363 additions and 87 deletions.
1 change: 1 addition & 0 deletions cmd/pebble/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func newPebbleDB(dir string) DB {
DisableWAL: disableWAL,
MemTableSize: 64 << 20,
MemTableStopWritesThreshold: 4,
MinCompactionRate: 4 << 20,
MinFlushRate: 4 << 20,
L0CompactionThreshold: 2,
L0StopWritesThreshold: 32,
Expand Down
59 changes: 59 additions & 0 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,10 @@ func (d *DB) flush1() error {
return err
}

// Refresh compaction debt estimate since flush has been applied.
d.pendingFlushCompactionDebt = 0
d.compactionDebt = d.mu.versions.picker.estimatedCompactionDebt()

flushed := d.mu.mem.queue[:n]
d.mu.mem.queue = d.mu.mem.queue[n:]
d.updateReadStateLocked()
Expand Down Expand Up @@ -807,6 +811,10 @@ func (d *DB) compact1() (err error) {
if err != nil {
return err
}

// Recompute estimated w-amp since compaction has been applied.
d.estimatedWAmp = d.mu.versions.picker.estimatedWAmp()

d.updateReadStateLocked()
d.deleteObsoleteFiles(jobID)
return nil
Expand Down Expand Up @@ -852,6 +860,20 @@ func (d *DB) runCompaction(c *compaction) (

snapshots := d.mu.snapshots.toSlice()

// compactionSlowdownThreshold is the low watermark for compaction debt. If compaction
// debt is below this threshold, we slow down compactions. If compaction debt is above
// this threshold, we let compactions continue as fast as possible. We want to keep
// compaction debt as low as possible to match the speed of flushes. This threshold
// is set so that a single flush cannot contribute enough compaction debt to overshoot
// the threshold.
var compactionSlowdownThreshold uint64
if c.flushing == nil {
compactionSlowdownThreshold =
uint64(d.mu.versions.picker.estimatedMaxWAmp() * float64(d.opts.MemTableSize))
}
d.compactionDebt = d.mu.versions.picker.estimatedCompactionDebt()
d.estimatedWAmp = d.mu.versions.picker.estimatedWAmp()

// Release the d.mu lock while doing I/O.
// Note the unusual order: Unlock and then Lock.
d.mu.Unlock()
Expand Down Expand Up @@ -1019,6 +1041,8 @@ func (d *DB) runCompaction(c *compaction) (
flushAmount := c.bytesIterated - prevBytesIterated
prevBytesIterated = c.bytesIterated

d.pendingFlushCompactionDebt += uint64(d.estimatedWAmp * float64(flushAmount))

// We slow down memtable flushing when the dirty bytes indicator falls
// below the low watermark, which is 105% memtable size. This will only
// occur if memtable flushing can keep up with the pace of incoming
Expand All @@ -1045,6 +1069,41 @@ func (d *DB) runCompaction(c *compaction) (
}
d.flushLimiter.AllowN(time.Now(), int(flushAmount))
}
} else {
var curCompactionDebt uint64
if d.compactionDebt + d.pendingFlushCompactionDebt > c.bytesIterated {
curCompactionDebt = d.compactionDebt + d.pendingFlushCompactionDebt - c.bytesIterated
}

compactAmount := c.bytesIterated - prevBytesIterated
// We slow down compactions when the compaction debt falls below the slowdown
// threshold, which is set dynamically based on the number of non-empty levels.
// This will only occur if compactions can keep up with the pace of flushes. If
// bytes are flushed faster than how fast compactions can occur, compactions
// proceed at maximum (unthrottled) speed.
if curCompactionDebt <= compactionSlowdownThreshold {
burst := d.compactionLimiter.Burst()
for compactAmount > uint64(burst) {
err := d.compactionLimiter.WaitN(context.Background(), burst)
if err != nil {
return nil, pendingOutputs, err
}
compactAmount -= uint64(burst)
}
err := d.compactionLimiter.WaitN(context.Background(), int(compactAmount))
if err != nil {
return nil, pendingOutputs, err
}
} else {
burst := d.compactionLimiter.Burst()
for compactAmount > uint64(burst) {
d.compactionLimiter.AllowN(time.Now(), burst)
compactAmount -= uint64(burst)
}
d.compactionLimiter.AllowN(time.Now(), int(compactAmount))
}

prevBytesIterated = c.bytesIterated
}
// TODO(peter,rangedel): Need to incorporate the range tombstones in the
// shouldStopBefore decision.
Expand Down
121 changes: 117 additions & 4 deletions compaction_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@ import (
// compaction picker is associated with a single version. A new compaction
// picker is created and initialized every time a new version is installed.
type compactionPicker struct {
opts *Options
vers *version

// The level to target for L0 compactions. Levels L1 to baseLevel must be
// empty.
baseLevel int

// smoothedLevelMultiplier is the size ratio between one level and the next.
smoothedLevelMultiplier float64

// levelMaxBytes holds the dynamically adjusted max bytes setting for each
// level.
levelMaxBytes [numLevels]int64
Expand All @@ -32,6 +36,7 @@ type compactionPicker struct {

func newCompactionPicker(v *version, opts *Options) *compactionPicker {
p := &compactionPicker{
opts: opts,
vers: v,
}
p.initLevelMaxBytes(v, opts)
Expand All @@ -46,6 +51,115 @@ func (p *compactionPicker) compactionNeeded() bool {
return p.score >= 1
}

// estimatedCompactionDebt estimates the number of bytes which need to be
// compacted before the LSM tree becomes stable.
func (p *compactionPicker) estimatedCompactionDebt() uint64 {
if p == nil {
return 0
}

var levelSize uint64
for _, file := range p.vers.files[0] {
levelSize += file.size
}

estimatedCompactionDebt := levelSize
bytesAddedToNextLevel := levelSize

var nextLevelSize uint64
for level := p.baseLevel; level < numLevels - 1; level++ {
if nextLevelSize > 0 {
levelSize = nextLevelSize
nextLevelSize = 0
} else {
levelSize = 0
for _, file := range p.vers.files[level] {
levelSize += file.size
}
}

if level == p.baseLevel {
estimatedCompactionDebt += levelSize
}

levelSize += bytesAddedToNextLevel
bytesAddedToNextLevel = 0
if levelSize > uint64(p.levelMaxBytes[level]) {
bytesAddedToNextLevel = levelSize - uint64(p.levelMaxBytes[level])
for _, file := range p.vers.files[level + 1] {
nextLevelSize += file.size
}

levelRatio := float64(nextLevelSize)/float64(levelSize)
estimatedCompactionDebt += uint64(float64(bytesAddedToNextLevel) * (levelRatio + 1))
}
}

return estimatedCompactionDebt
}

// estimatedWAmp estimates the write amp per byte that is added to L0.
func (p *compactionPicker) estimatedWAmp() float64 {
if p == nil {
return 0
}

var levelSize uint64
for _, file := range p.vers.files[0] {
levelSize += file.size
}
// We add memtable size here to account for overflow into the next level.
levelSize += uint64(p.opts.MemTableSize)

estimatedWAmp := 1.0

bytesAddedToNextLevel := levelSize
var nextLevelSize uint64
for level := p.baseLevel; level < numLevels - 1; level++ {
if nextLevelSize > 0 {
levelSize = nextLevelSize
nextLevelSize = 0
} else {
levelSize = 0
for _, file := range p.vers.files[level] {
levelSize += file.size
}
}

if level == p.baseLevel {
// We need to use an estimate of average L0 size because the actual L0 size
// is very jumpy.
// TODO(ryan): There is probably a better way to estimate this. Maybe we can
// look at how many L0 files were compacted in the last L0->Lbase compaction.
estimatedAverageL0Size := p.opts.L0CompactionThreshold * p.opts.MemTableSize
estimatedWAmp += float64(levelSize) / float64(estimatedAverageL0Size)
}

levelSize += bytesAddedToNextLevel
bytesAddedToNextLevel = 0
if levelSize > uint64(p.levelMaxBytes[level]) {
bytesAddedToNextLevel = levelSize - uint64(p.levelMaxBytes[level])
for _, file := range p.vers.files[level + 1] {
nextLevelSize += file.size
}

levelRatio := float64(nextLevelSize)/float64(levelSize)
estimatedWAmp += levelRatio + 1
} else {
// Return because next levels no longer contribute to compaction debt.
return float64(estimatedWAmp)
}
}

return float64(estimatedWAmp)
}

// estimatedMaxWAmp estimates the maximum possible write amp per byte that is
// added to L0.
func (p *compactionPicker) estimatedMaxWAmp() float64 {
return float64(numLevels - p.baseLevel) * (p.smoothedLevelMultiplier + 1)
}

func (p *compactionPicker) initLevelMaxBytes(v *version, opts *Options) {
// Determine the first non-empty level and the maximum size of any level.
firstNonEmptyLevel := -1
Expand Down Expand Up @@ -98,19 +212,18 @@ func (p *compactionPicker) initLevelMaxBytes(v *version, opts *Options) {
}
}

var smoothedLevelMultiplier float64
if p.baseLevel < numLevels-1 {
smoothedLevelMultiplier = math.Pow(
p.smoothedLevelMultiplier = math.Pow(
float64(bottomLevelSize)/float64(baseBytesMax),
1.0/float64(numLevels-p.baseLevel-1))
} else {
smoothedLevelMultiplier = 1.0
p.smoothedLevelMultiplier = 1.0
}

levelSize := float64(baseBytesMax)
for level := p.baseLevel; level < numLevels; level++ {
if level > p.baseLevel && levelSize > 0 {
levelSize *= smoothedLevelMultiplier
levelSize *= p.smoothedLevelMultiplier
}
// Round the result since test cases use small target level sizes, which
// can be impacted by floating-point imprecision + integer truncation.
Expand Down
Loading

0 comments on commit 608a182

Please sign in to comment.