Skip to content

Commit

Permalink
compaction: limit expansion by disk's available bytes
Browse files Browse the repository at this point in the history
Incorporate the disk's available capacity into the
`expandedCompactionByteSizeLimit` calculation. Limit the max expansion
to half the disk's capacity, divided by the max compaction concurrency.
  • Loading branch information
jbowens committed Aug 26, 2021
1 parent 3da05e7 commit 2b4f90f
Show file tree
Hide file tree
Showing 9 changed files with 206 additions and 83 deletions.
42 changes: 36 additions & 6 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,20 @@ var gcLabels = pprof.Labels("pebble", "gc")
// expandedCompactionByteSizeLimit is the maximum number of bytes in all
// compacted files. We avoid expanding the lower level file set of a compaction
// if it would make the total compaction cover more than this many bytes.
func expandedCompactionByteSizeLimit(opts *Options, level int) uint64 {
return uint64(25 * opts.Level(level).TargetFileSize)
func expandedCompactionByteSizeLimit(opts *Options, level int, availBytes uint64) uint64 {
v := uint64(25 * opts.Level(level).TargetFileSize)

// Never expand a compaction beyond half the available capacity, divided
// by the maximum number of concurrent compactions. Each of the concurrent
// compactions may expand up to this limit, so this attempts to limit
// compactions to half of available disk space. Note that this will not
// prevent compaction picking from pursuing compactions that are larger
// than this threshold before expansion.
diskMax := (availBytes / 2) / uint64(opts.MaxConcurrentCompactions)
if v > diskMax {
v = diskMax
}
return v
}

// maxGrandparentOverlapBytes is the maximum bytes of overlap with level+1
Expand Down Expand Up @@ -1279,15 +1291,27 @@ func (d *DB) getFlushPacerInfo() flushPacerInfo {
return pacerInfo
}

func (d *DB) calculateDiskAvailableBytes() uint64 {
if space, err := d.opts.FS.GetDiskUsage(d.dirname); err == nil {
atomic.StoreUint64(&d.atomic.diskAvailBytes, space.AvailBytes)
return space.AvailBytes
} else if !errors.Is(err, vfs.ErrUnsupported) {
d.opts.EventListener.BackgroundError(err)
}
return atomic.LoadUint64(&d.atomic.diskAvailBytes)
}

func (d *DB) getDiskAvailableBytesCached() uint64 {
return atomic.LoadUint64(&d.atomic.diskAvailBytes)
}

func (d *DB) getDeletionPacerInfo() deletionPacerInfo {
var pacerInfo deletionPacerInfo
// Call GetDiskUsage after every file deletion. This may seem inefficient,
// but in practice this was observed to take constant time, regardless of
// volume size used, at least on linux with ext4 and zfs. All invocations
// take 10 microseconds or less.
if space, err := d.opts.FS.GetDiskUsage(d.dirname); err == nil {
pacerInfo.freeBytes = space.AvailBytes
}
pacerInfo.freeBytes = d.calculateDiskAvailableBytes()
d.mu.Lock()
pacerInfo.obsoleteBytes = d.mu.versions.metrics.Table.ObsoleteSize
pacerInfo.liveBytes = uint64(d.mu.versions.metrics.Total().Size)
Expand Down Expand Up @@ -2493,12 +2517,18 @@ func (d *DB) runCompaction(
return nil, pendingOutputs, err
}

// Refresh the disk available statistic whenever a compaction/flush
// completes, before re-acquiring the mutex.
_ = d.calculateDiskAvailableBytes()

return ve, pendingOutputs, nil
}

// validateVersionEdit validates that start and end keys across new and deleted
// files in a versionEdit pass the given validation function.
func validateVersionEdit(ve *versionEdit, validateFn func([]byte) error, format base.FormatKey) error {
func validateVersionEdit(
ve *versionEdit, validateFn func([]byte) error, format base.FormatKey,
) error {
if validateFn == nil {
return nil
}
Expand Down
95 changes: 64 additions & 31 deletions compaction_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ type pickedCompaction struct {
// equal to startLevel+1 except when startLevel is 0 in which case it is
// equal to compactionPicker.baseLevel().
outputLevel *compactionLevel
// adjustedOutputLevel is the output level used for the purpose of
// determining the target output file size, overlap bytes, and expanded
// bytes, taking into account the base level.
adjustedOutputLevel int

inputs []compactionLevel

Expand All @@ -123,10 +127,6 @@ type pickedCompaction struct {
// maxOverlapBytes is the maximum number of bytes of overlap allowed for a
// single output table with the tables in the grandparent level.
maxOverlapBytes uint64
// maxExpandedBytes is the maximum size of an expanded compaction. If growing
// a compaction results in a larger size, the original compaction is used
// instead.
maxExpandedBytes uint64

// The boundaries of the input data.
smallest InternalKey
Expand Down Expand Up @@ -154,12 +154,12 @@ func newPickedCompaction(opts *Options, cur *version, startLevel, baseLevel int)
adjustedOutputLevel := 1 + outputLevel - baseLevel

pc := &pickedCompaction{
cmp: opts.Comparer.Compare,
version: cur,
inputs: []compactionLevel{{level: startLevel}, {level: outputLevel}},
maxOutputFileSize: uint64(opts.Level(adjustedOutputLevel).TargetFileSize),
maxOverlapBytes: maxGrandparentOverlapBytes(opts, adjustedOutputLevel),
maxExpandedBytes: expandedCompactionByteSizeLimit(opts, adjustedOutputLevel),
cmp: opts.Comparer.Compare,
version: cur,
inputs: []compactionLevel{{level: startLevel}, {level: outputLevel}},
adjustedOutputLevel: adjustedOutputLevel,
maxOutputFileSize: uint64(opts.Level(adjustedOutputLevel).TargetFileSize),
maxOverlapBytes: maxGrandparentOverlapBytes(opts, adjustedOutputLevel),
}
pc.startLevel = &pc.inputs[0]
pc.outputLevel = &pc.inputs[1]
Expand Down Expand Up @@ -191,7 +191,12 @@ func newPickedCompactionFromL0(
return pc
}

func (pc *pickedCompaction) setupInputs() bool {
func (pc *pickedCompaction) setupInputs(opts *Options, diskAvailBytes uint64) bool {
// maxExpandedBytes is the maximum size of an expanded compaction. If
// growing a compaction results in a larger size, the original compaction
// is used instead.
maxExpandedBytes := expandedCompactionByteSizeLimit(opts, pc.adjustedOutputLevel, diskAvailBytes)

// Expand the initial inputs to a clean cut.
var isCompacting bool
pc.startLevel.files, isCompacting = expandToAtomicUnit(pc.cmp, pc.startLevel.files, false /* disableIsCompacting */)
Expand Down Expand Up @@ -268,15 +273,15 @@ func (pc *pickedCompaction) setupInputs() bool {
sizeSum += f.Size
}
}
if sizeSum+pc.outputLevel.files.SizeSum() < pc.maxExpandedBytes {
if sizeSum+pc.outputLevel.files.SizeSum() < maxExpandedBytes {
pc.startLevel.files = manifest.NewLevelSliceSeqSorted(newStartLevelFiles)
pc.smallest, pc.largest = manifest.KeyRange(pc.cmp,
pc.startLevel.files.Iter(), pc.outputLevel.files.Iter())
} else {
*pc.lcf = oldLcf
}
}
} else if pc.grow(pc.smallest, pc.largest) {
} else if pc.grow(pc.smallest, pc.largest, maxExpandedBytes) {
pc.smallest, pc.largest = manifest.KeyRange(pc.cmp,
pc.startLevel.files.Iter(), pc.outputLevel.files.Iter())
}
Expand All @@ -286,7 +291,7 @@ func (pc *pickedCompaction) setupInputs() bool {
// grow grows the number of inputs at c.level without changing the number of
// c.level+1 files in the compaction, and returns whether the inputs grew. sm
// and la are the smallest and largest InternalKeys in all of the inputs.
func (pc *pickedCompaction) grow(sm, la InternalKey) bool {
func (pc *pickedCompaction) grow(sm, la InternalKey, maxExpandedBytes uint64) bool {
if pc.outputLevel.files.Empty() {
return false
}
Expand All @@ -298,7 +303,7 @@ func (pc *pickedCompaction) grow(sm, la InternalKey) bool {
if grow0.Len() <= pc.startLevel.files.Len() {
return false
}
if grow0.SizeSum()+pc.outputLevel.files.SizeSum() >= pc.maxExpandedBytes {
if grow0.SizeSum()+pc.outputLevel.files.SizeSum() >= maxExpandedBytes {
return false
}
sm1, la1 := manifest.KeyRange(pc.cmp, grow0.Iter())
Expand Down Expand Up @@ -418,12 +423,17 @@ func expandToAtomicUnit(
}

func newCompactionPicker(
v *version, opts *Options, inProgressCompactions []compactionInfo, levelSizes [numLevels]int64,
v *version,
opts *Options,
inProgressCompactions []compactionInfo,
levelSizes [numLevels]int64,
diskAvailBytes func() uint64,
) compactionPicker {
p := &compactionPickerByScore{
opts: opts,
vers: v,
levelSizes: levelSizes,
opts: opts,
vers: v,
levelSizes: levelSizes,
diskAvailBytes: diskAvailBytes,
}
p.initLevelMaxBytes(inProgressCompactions)
return p
Expand Down Expand Up @@ -521,6 +531,19 @@ type compactionPickerByScore struct {

// levelSizes holds the current size of each level.
levelSizes [numLevels]int64

// diskAvailBytes returns a cached statistic on the number of bytes
// available on disk, as reported by the filesystem. It's used to be more
// restrictive in expanding compactions if available disk space is
// limited.
//
// The cached value is updated whenever a file is deleted and
// whenever a compaction or flush completes. Since file removal is
// the primary means of reclaiming space, there is a rough bound on
// the statistic's staleness when available bytes is growing.
// Compactions and flushes are longer, slower operations and provide
// a much looser bound when available bytes is decreasing.
diskAvailBytes func() uint64
}

var _ compactionPicker = &compactionPickerByScore{}
Expand Down Expand Up @@ -959,7 +982,7 @@ func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompact
}

if info.level == 0 {
pc = pickL0(env, p.opts, p.vers, p.baseLevel)
pc = pickL0(env, p.opts, p.vers, p.baseLevel, p.diskAvailBytes)
// Fail-safe to protect against compacting the same sstable
// concurrently.
if pc != nil && !inputRangeAlreadyCompacting(env, pc) {
Expand All @@ -979,7 +1002,7 @@ func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompact
continue
}

pc := pickAutoHelper(env, p.opts, p.vers, *info, p.baseLevel)
pc := pickAutoHelper(env, p.opts, p.vers, *info, p.baseLevel, p.diskAvailBytes)
// Fail-safe to protect against compacting the same sstable concurrently.
if pc != nil && !inputRangeAlreadyCompacting(env, pc) {
pc.score = info.score
Expand Down Expand Up @@ -1100,7 +1123,12 @@ func (p *compactionPickerByScore) pickElisionOnlyCompaction(
}

func pickAutoHelper(
env compactionEnv, opts *Options, vers *version, cInfo candidateLevelInfo, baseLevel int,
env compactionEnv,
opts *Options,
vers *version,
cInfo candidateLevelInfo,
baseLevel int,
diskAvailBytes func() uint64,
) (pc *pickedCompaction) {
if cInfo.outputLevel == 0 {
return pickIntraL0(env, opts, vers)
Expand All @@ -1121,15 +1149,17 @@ func pickAutoHelper(
}
}

if !pc.setupInputs() {
if !pc.setupInputs(opts, diskAvailBytes()) {
return nil
}
return pc
}

// Helper method to pick compactions originating from L0. Uses information about
// sublevels to generate a compaction.
func pickL0(env compactionEnv, opts *Options, vers *version, baseLevel int) (pc *pickedCompaction) {
func pickL0(
env compactionEnv, opts *Options, vers *version, baseLevel int, diskAvailBytes func() uint64,
) (pc *pickedCompaction) {
// It is important to pass information about Lbase files to L0Sublevels
// so it can pick a compaction that does not conflict with an Lbase => Lbase+1
// compaction. Without this, we observed reduced concurrency of L0=>Lbase
Expand All @@ -1144,7 +1174,7 @@ func pickL0(env compactionEnv, opts *Options, vers *version, baseLevel int) (pc
}
if lcf != nil {
pc = newPickedCompactionFromL0(lcf, opts, vers, baseLevel, true)
pc.setupInputs()
pc.setupInputs(opts, diskAvailBytes())
if pc.startLevel.files.Empty() {
opts.Logger.Fatalf("empty compaction chosen")
}
Expand All @@ -1162,7 +1192,7 @@ func pickL0(env compactionEnv, opts *Options, vers *version, baseLevel int) (pc
}
if lcf != nil {
pc = newPickedCompactionFromL0(lcf, opts, vers, 0, false)
if !pc.setupInputs() {
if !pc.setupInputs(opts, diskAvailBytes()) {
return nil
}
if pc.startLevel.files.Empty() {
Expand Down Expand Up @@ -1276,7 +1306,6 @@ func pickIntraL0(env compactionEnv, opts *Options, vers *version) (pc *pickedCom
// thus read all of the L0 sstables anyways, even if they are partitioned.
pc.maxOutputFileSize = math.MaxUint64
pc.maxOverlapBytes = math.MaxUint64
pc.maxExpandedBytes = math.MaxUint64
return pc
}

Expand Down Expand Up @@ -1304,7 +1333,7 @@ func (p *compactionPickerByScore) pickManual(
if conflictsWithInProgress(manual.level, outputLevel, env.inProgressCompactions) {
return nil, true
}
pc = pickManualHelper(p.opts, manual, p.vers, p.baseLevel)
pc = pickManualHelper(p.opts, manual, p.vers, p.baseLevel, p.diskAvailBytes)
if pc == nil {
return nil, false
}
Expand All @@ -1319,7 +1348,11 @@ func (p *compactionPickerByScore) pickManual(
}

func pickManualHelper(
opts *Options, manual *manualCompaction, vers *version, baseLevel int,
opts *Options,
manual *manualCompaction,
vers *version,
baseLevel int,
diskAvailBytes func() uint64,
) (pc *pickedCompaction) {
pc = newPickedCompaction(opts, vers, manual.level, baseLevel)
manual.outputLevel = pc.outputLevel.level
Expand All @@ -1329,7 +1362,7 @@ func pickManualHelper(
// Nothing to do
return nil
}
if !pc.setupInputs() {
if !pc.setupInputs(opts, diskAvailBytes()) {
return nil
}
return pc
Expand Down Expand Up @@ -1370,7 +1403,7 @@ func pickReadTriggeredCompactionHelper(
}
pc = newPickedCompaction(p.opts, p.vers, rc.level, p.baseLevel)
pc.startLevel.files = overlapSlice
if !pc.setupInputs() {
if !pc.setupInputs(p.opts, p.diskAvailBytes()) {
return nil
}
if inputRangeAlreadyCompacting(env, pc) {
Expand Down
Loading

0 comments on commit 2b4f90f

Please sign in to comment.