Skip to content

Commit

Permalink
Add compaction pacing mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
Ryanfsdf committed Jul 29, 2019
1 parent f26236c commit 9a6a745
Show file tree
Hide file tree
Showing 10 changed files with 308 additions and 92 deletions.
3 changes: 2 additions & 1 deletion cmd/pebble/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ func newPebbleDB(dir string) DB {
DisableWAL: disableWAL,
MemTableSize: 64 << 20,
MemTableStopWritesThreshold: 4,
MinFlushRate: 4 << 20,
MinCompactionRate: 4 << 20, // 4 MB/s
MinFlushRate: 1 << 20, // 1 MB/s
L0CompactionThreshold: 2,
L0StopWritesThreshold: 32,
LBaseMaxBytes: 64 << 20, // 64 MB
Expand Down
64 changes: 64 additions & 0 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,9 @@ func (d *DB) flush1() error {
return err
}

// Refresh bytes flushed count.
atomic.StoreUint64(&d.bytesFlushed, 0)

flushed := d.mu.mem.queue[:n]
d.mu.mem.queue = d.mu.mem.queue[n:]
d.updateReadStateLocked()
Expand Down Expand Up @@ -807,6 +810,7 @@ func (d *DB) compact1() (err error) {
if err != nil {
return err
}

d.updateReadStateLocked()
d.deleteObsoleteFiles(jobID)
return nil
Expand Down Expand Up @@ -999,6 +1003,10 @@ func (d *DB) runCompaction(c *compaction) (
totalBytes := d.memTableTotalBytes()
refreshDirtyBytesThreshold := uint64(d.opts.MemTableSize * 5 / 100)

var compactionSlowdownThreshold uint64
var totalCompactionDebt uint64
var estimatedMaxWAmp float64

for key, val := iter.First(); key != nil; key, val = iter.Next() {
// Slow down memtable flushing to match fill rate.
if c.flushing != nil {
Expand All @@ -1019,6 +1027,8 @@ func (d *DB) runCompaction(c *compaction) (
flushAmount := c.bytesIterated - prevBytesIterated
prevBytesIterated = c.bytesIterated

atomic.StoreUint64(&d.bytesFlushed, c.bytesIterated)

// We slow down memtable flushing when the dirty bytes indicator falls
// below the low watermark, which is 105% memtable size. This will only
// occur if memtable flushing can keep up with the pace of incoming
Expand All @@ -1045,6 +1055,60 @@ func (d *DB) runCompaction(c *compaction) (
}
d.flushLimiter.AllowN(time.Now(), int(flushAmount))
}
} else {
bytesFlushed := atomic.LoadUint64(&d.bytesFlushed)

if iterCount >= 1000 || c.bytesIterated > refreshDirtyBytesThreshold {
d.mu.Lock()
estimatedMaxWAmp = d.mu.versions.picker.estimatedMaxWAmp()
// compactionSlowdownThreshold is the low watermark for compaction debt. If compaction
// debt is below this threshold, we slow down compactions. If compaction debt is above
// this threshold, we let compactions continue as fast as possible. We want to keep
// compaction speed as slow as possible to match the speed of flushes. This threshold
// is set so that a single flush cannot contribute enough compaction debt to overshoot
// the threshold.
compactionSlowdownThreshold = uint64(estimatedMaxWAmp * float64(d.opts.MemTableSize))
totalCompactionDebt = d.mu.versions.picker.estimatedCompactionDebt(bytesFlushed)
d.mu.Unlock()
refreshDirtyBytesThreshold = c.bytesIterated + uint64(d.opts.MemTableSize*5/100)
iterCount = 0
}
iterCount++

var curCompactionDebt uint64
if totalCompactionDebt > c.bytesIterated {
curCompactionDebt = totalCompactionDebt - c.bytesIterated
}

compactAmount := c.bytesIterated - prevBytesIterated
// We slow down compactions when the compaction debt falls below the slowdown
// threshold, which is set dynamically based on the number of non-empty levels.
// This will only occur if compactions can keep up with the pace of flushes. If
// bytes are flushed faster than how fast compactions can occur, compactions
// proceed at maximum (unthrottled) speed.
if curCompactionDebt <= compactionSlowdownThreshold {
burst := d.compactionLimiter.Burst()
for compactAmount > uint64(burst) {
err := d.compactionLimiter.WaitN(context.Background(), burst)
if err != nil {
return nil, pendingOutputs, err
}
compactAmount -= uint64(burst)
}
err := d.compactionLimiter.WaitN(context.Background(), int(compactAmount))
if err != nil {
return nil, pendingOutputs, err
}
} else {
burst := d.compactionLimiter.Burst()
for compactAmount > uint64(burst) {
d.compactionLimiter.AllowN(time.Now(), burst)
compactAmount -= uint64(burst)
}
d.compactionLimiter.AllowN(time.Now(), int(compactAmount))
}

prevBytesIterated = c.bytesIterated
}
// TODO(peter,rangedel): Need to incorporate the range tombstones in the
// shouldStopBefore decision.
Expand Down
56 changes: 52 additions & 4 deletions compaction_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@ import (
// compaction picker is associated with a single version. A new compaction
// picker is created and initialized every time a new version is installed.
type compactionPicker struct {
opts *Options
vers *version

// The level to target for L0 compactions. Levels L1 to baseLevel must be
// empty.
baseLevel int

// smoothedLevelMultiplier is the size ratio between one level and the next.
smoothedLevelMultiplier float64

// levelMaxBytes holds the dynamically adjusted max bytes setting for each
// level.
levelMaxBytes [numLevels]int64
Expand All @@ -32,6 +36,7 @@ type compactionPicker struct {

func newCompactionPicker(v *version, opts *Options) *compactionPicker {
p := &compactionPicker{
opts: opts,
vers: v,
}
p.initLevelMaxBytes(v, opts)
Expand All @@ -46,6 +51,50 @@ func (p *compactionPicker) compactionNeeded() bool {
return p.score >= 1
}

// estimatedCompactionDebt estimates the number of bytes which need to be
// compacted before the LSM tree becomes stable.
func (p *compactionPicker) estimatedCompactionDebt(l0ExtraSize uint64) uint64 {
if p == nil {
return 0
}

compactionDebt := totalSize(p.vers.files[0]) + l0ExtraSize
bytesAddedToNextLevel := compactionDebt

levelSize := totalSize(p.vers.files[p.baseLevel])
// estimatedL0CompactionSize is the estimated size of the L0 component in the
// current or next L0->LBase compaction. This is needed to estimate the number
// of L0->LBase compactions which will need to occur for the LSM tree to
// become stable.
estimatedL0CompactionSize := uint64(p.opts.L0CompactionThreshold * p.opts.MemTableSize)
// The ratio bytesAddedToNextLevel(L0 Size)/estimatedL0CompactionSize is the
// estimated number of L0->LBase compactions which will need to occur for the
// LSM tree to become stable. We multiply this by levelSize(LBase size) to
// estimate the compaction debt incurred by LBase in the L0->LBase compactions.
compactionDebt += (levelSize * bytesAddedToNextLevel) / estimatedL0CompactionSize

var nextLevelSize uint64
for level := p.baseLevel; level < numLevels - 1; level++ {
levelSize += bytesAddedToNextLevel
bytesAddedToNextLevel = 0
nextLevelSize = totalSize(p.vers.files[level + 1])
if levelSize > uint64(p.levelMaxBytes[level]) {
bytesAddedToNextLevel = levelSize - uint64(p.levelMaxBytes[level])
levelRatio := float64(nextLevelSize)/float64(levelSize)
compactionDebt += uint64(float64(bytesAddedToNextLevel) * (levelRatio + 1))
}
levelSize = nextLevelSize
}

return compactionDebt
}

// estimatedMaxWAmp estimates the maximum possible write amp per byte that is
// added to L0.
func (p *compactionPicker) estimatedMaxWAmp() float64 {
return float64(numLevels - p.baseLevel) * (p.smoothedLevelMultiplier + 1)
}

func (p *compactionPicker) initLevelMaxBytes(v *version, opts *Options) {
// Determine the first non-empty level and the maximum size of any level.
firstNonEmptyLevel := -1
Expand Down Expand Up @@ -98,19 +147,18 @@ func (p *compactionPicker) initLevelMaxBytes(v *version, opts *Options) {
}
}

var smoothedLevelMultiplier float64
if p.baseLevel < numLevels-1 {
smoothedLevelMultiplier = math.Pow(
p.smoothedLevelMultiplier = math.Pow(
float64(bottomLevelSize)/float64(baseBytesMax),
1.0/float64(numLevels-p.baseLevel-1))
} else {
smoothedLevelMultiplier = 1.0
p.smoothedLevelMultiplier = 1.0
}

levelSize := float64(baseBytesMax)
for level := p.baseLevel; level < numLevels; level++ {
if level > p.baseLevel && levelSize > 0 {
levelSize *= smoothedLevelMultiplier
levelSize *= p.smoothedLevelMultiplier
}
// Round the result since test cases use small target level sizes, which
// can be impacted by floating-point imprecision + integer truncation.
Expand Down
156 changes: 73 additions & 83 deletions compaction_picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,53 +14,62 @@ import (
"github.com/petermattis/pebble/internal/datadriven"
)

func loadVersion(d *datadriven.TestData) (*version, *Options, string) {
opts := &Options{}
opts.EnsureDefaults()

if len(d.CmdArgs) != 1 {
return nil, nil, fmt.Sprintf("%s expects 1 argument", d.Cmd)
}
var err error
opts.LBaseMaxBytes, err = strconv.ParseInt(d.CmdArgs[0].Key, 10, 64)
if err != nil {
return nil, nil, err.Error()
}

vers := &version{}
if len(d.Input) > 0 {
for _, data := range strings.Split(d.Input, "\n") {
parts := strings.Split(data, ":")
if len(parts) != 2 {
return nil, nil, fmt.Sprintf("malformed test:\n%s", d.Input)
}
level, err := strconv.Atoi(parts[0])
if err != nil {
return nil, nil, err.Error()
}
if vers.files[level] != nil {
return nil, nil, fmt.Sprintf("level %d already filled", level)
}
size, err := strconv.ParseUint(strings.TrimSpace(parts[1]), 10, 64)
if err != nil {
return nil, nil, err.Error()
}
if level == 0 {
for i := uint64(0); i < size; i++ {
vers.files[level] = append(vers.files[level], fileMetadata{
size: 1,
})
}
} else {
vers.files[level] = append(vers.files[level], fileMetadata{
size: size,
})
}
}
}

return vers, opts, ""
}

func TestCompactionPickerLevelMaxBytes(t *testing.T) {
datadriven.RunTest(t, "testdata/compaction_picker_level_max_bytes",
func(d *datadriven.TestData) string {
switch d.Cmd {
case "init":
opts := &Options{}
opts.EnsureDefaults()

if len(d.CmdArgs) != 1 {
return fmt.Sprintf("%s expects 1 argument", d.Cmd)
}
var err error
opts.LBaseMaxBytes, err = strconv.ParseInt(d.CmdArgs[0].Key, 10, 64)
if err != nil {
return err.Error()
}

vers := &version{}
if len(d.Input) > 0 {
for _, data := range strings.Split(d.Input, "\n") {
parts := strings.Split(data, ":")
if len(parts) != 2 {
return fmt.Sprintf("malformed test:\n%s", d.Input)
}
level, err := strconv.Atoi(parts[0])
if err != nil {
return err.Error()
}
if vers.files[level] != nil {
return fmt.Sprintf("level %d already filled", level)
}
size, err := strconv.ParseUint(strings.TrimSpace(parts[1]), 10, 64)
if err != nil {
return err.Error()
}
if level == 0 {
for i := uint64(0); i < size; i++ {
vers.files[level] = append(vers.files[level], fileMetadata{
size: 1,
})
}
} else {
vers.files[level] = append(vers.files[level], fileMetadata{
size: size,
})
}
}
vers, opts, errMsg := loadVersion(d)
if errMsg != "" {
return errMsg
}

p := newCompactionPicker(vers, opts)
Expand All @@ -81,52 +90,33 @@ func TestCompactionPickerTargetLevel(t *testing.T) {
func(d *datadriven.TestData) string {
switch d.Cmd {
case "pick":
opts := &Options{}
opts.EnsureDefaults()

if len(d.CmdArgs) != 1 {
return fmt.Sprintf("%s expects 1 argument", d.Cmd)
}
var err error
opts.LBaseMaxBytes, err = strconv.ParseInt(d.CmdArgs[0].Key, 10, 64)
if err != nil {
return err.Error()
vers, opts, errMsg := loadVersion(d)
if errMsg != "" {
return errMsg
}

vers := &version{}
if len(d.Input) > 0 {
for _, data := range strings.Split(d.Input, "\n") {
parts := strings.Split(data, ":")
if len(parts) != 2 {
return fmt.Sprintf("malformed test:\n%s", d.Input)
}
level, err := strconv.Atoi(parts[0])
if err != nil {
return err.Error()
}
if vers.files[level] != nil {
return fmt.Sprintf("level %d already filled", level)
}
size, err := strconv.ParseUint(strings.TrimSpace(parts[1]), 10, 64)
if err != nil {
return err.Error()
}
if level == 0 {
for i := uint64(0); i < size; i++ {
vers.files[level] = append(vers.files[level], fileMetadata{
size: 1,
})
}
} else {
vers.files[level] = append(vers.files[level], fileMetadata{
size: size,
})
}
}
p := newCompactionPicker(vers, opts)
return fmt.Sprintf("%d: %.1f\n", p.level, p.score)

default:
return fmt.Sprintf("unknown command: %s", d.Cmd)
}
})
}

func TestCompactionPickerEstimatedCompactionDebt(t *testing.T) {
datadriven.RunTest(t, "testdata/compaction_picker_estimated_debt",
func(d *datadriven.TestData) string {
switch d.Cmd {
case "init":
vers, opts, errMsg := loadVersion(d)
if errMsg != "" {
return errMsg
}
opts.MemTableSize = 1000

p := newCompactionPicker(vers, opts)
return fmt.Sprintf("%d: %.1f\n", p.level, p.score)
return fmt.Sprintf("%d\n", p.estimatedCompactionDebt(0))

default:
return fmt.Sprintf("unknown command: %s", d.Cmd)
Expand Down
Loading

0 comments on commit 9a6a745

Please sign in to comment.