Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] *: Wire up compaction picking with L0Sublevels, enable flush splits #642

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
371 changes: 255 additions & 116 deletions compaction.go

Large diffs are not rendered by default.

253 changes: 234 additions & 19 deletions compaction_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ type compactionPickerByScore struct {
// L0->Lbase compaction if there is not one in-progress, or the score for an
// intra-L0 compaction if there is in-progress L0->Lbase compaction.
scores [numLevels]pickedCompactionInfo

// The ratio of the bytes in level to the previous higher level. For Lbase,
// this will be the ratio of bytes(Lbase)/bytes(L0).
// See the comment in pickAuto().
currentByteRatios [numLevels]float64
}

var _ compactionPicker = &compactionPickerByScore{}
Expand Down Expand Up @@ -179,15 +184,24 @@ func (p *compactionPickerByScore) initLevelMaxBytes(inProgressCompactions []comp
// Determine the first non-empty level and the maximum size of any level.
firstNonEmptyLevel := -1
var bottomLevelSize int64
l0Size := int64(totalSize(p.vers.Files[0]))
dbSize := l0Size
prevLevelSize := l0Size
for level := 1; level < numLevels; level++ {
levelSize := int64(totalSize(p.vers.Files[level]))
dbSize += levelSize
if levelSize > 0 {
if firstNonEmptyLevel == -1 {
firstNonEmptyLevel = level
}
bottomLevelSize = levelSize
if prevLevelSize > 0 {
p.currentByteRatios[level] = float64(levelSize) / float64(prevLevelSize)
}
prevLevelSize = levelSize
}
}

for _, c := range inProgressCompactions {
if c.outputLevel == 0 {
continue
Expand All @@ -213,6 +227,12 @@ func (p *compactionPickerByScore) initLevelMaxBytes(inProgressCompactions []comp
}
return
}
// To handle the case where the LSM is not in the right shape (see the
// comment in PickAuto()) -- we don't want to pick an Lbase that is too
// low based on just looking at the bytes in the bottom level.
if float64(bottomLevelSize) < 0.8*float64(dbSize) {
bottomLevelSize = int64(0.8 * float64(dbSize))
}

levelMultiplier := 10.0

Expand Down Expand Up @@ -301,6 +321,28 @@ func (p *compactionPickerByScore) initScores(inProgressCompactions []compactionI
}

func (p *compactionPickerByScore) initL0Score(inProgressCompactions []compactionInfo) {
if p.vers.L0SubLevels != nil {
// fmt.Printf("depth: %d, threshold: %d\n", p.vers.L0SubLevels.MaxDepthAfterOngoingCompactions(),
// p.opts.L0CompactionThreshold)

// Note that there is an inconsistency between what we do here and how
// CockroachDB backpressure is implemented. The latter is simply using
// the sublevel count as a measure of read amplification. In contrast,
// the L0SubLevels uses the maximum number of files across all intervals
// which can be lower than the number of sublevels. For a Get that is
// using iterator bounds, the interval depth is a more precise measure of
// read amplification since levelIter will not open a file in a sublevel
// that has no file overlapping with key k if the iterator upper bound
// is set to k. I used the sublevel count for the CockroachDB backpressure
// simply because it was already plumbed through and the backpressure
// setting for the experiment are set very high anyway (so not trying to
// exercise precise control) -- but we should consider changing to the
// more accurate number here.
p.scores[0].score =
float64(p.vers.L0SubLevels.MaxDepthAfterOngoingCompactions()) / float64(p.opts.L0CompactionThreshold)
return
}

// TODO(peter): The current scoring logic precludes concurrent L0->Lbase
// compactions in most cases because if there is an in-progress L0->Lbase
// compaction we'll instead preferentially score an intra-L0 compaction. One
Expand Down Expand Up @@ -413,6 +455,18 @@ func (p *compactionPickerByScore) pickFile(level int) int {
return file
}

type sortCompactionLevels []*pickedCompactionInfo

func (s sortCompactionLevels) Len() int {
return len(s)
}
func (s sortCompactionLevels) Less(i, j int) bool {
return s[i].level < s[j].level
}
func (s sortCompactionLevels) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}

// pickAuto picks the best compaction, if any.
//
// On each call, pickAuto computes per-level size adjustments based on
Expand All @@ -423,35 +477,196 @@ func (p *compactionPickerByScore) pickFile(level int) int {
// If a score-based compaction cannot be found, pickAuto falls back to looking
// for a forced compaction (identified by FileMetadata.MarkedForCompaction).
func (p *compactionPickerByScore) pickAuto(env compactionEnv) (c *compaction) {
countBaseCompactions := 0
countIntraL0Compactions := 0
for i := range env.inProgressCompactions {
if env.inProgressCompactions[i].startLevel == 0 {
if env.inProgressCompactions[i].outputLevel == 0 {
countIntraL0Compactions++
} else {
countBaseCompactions++
}
}
}
// The following logic tries to accommodate situations where the bytes
// are being added at such a high rate that it is not viable to maintain
// a proper LSM shape even with concurrent compactions out of each level
// (including L0). That is, there is simply not enough compaction
// bandwidth available.
// - currentByteRatios allows us to decide the harm being caused by
// a level having more bytes than the target. Say L3 has 10GB and
// the target is 1GB, it is harmful if L2 has say 100MB, since it
// implies a write amplification of 100 when bytes are compacted from
// L2 to L3. However if L2 is also behind and has say 5GB, the write
// amplification for the compaction is fine. In that case the score of
// 10 for L3 is not doing harm and that compaction can be delayed.
//
// - The write amplification is an average number. In reality, it depends
// on the key byte distribution for a level. So there is a score
// beyond which even a low currentByteRatios is not sufficient to delay
// compaction -- see the comment in the if-condition below.
//
// - We segment compactions into 3 priority levels:
// - if the score is > 4 the compaction is either highest or high priority
// - The highest priority is based on a very high score or if the
// currentByteRatios is getting high.
// - The rest are high priority: Within high priority we prefer higher
// levels first. This is justified as follows:
// - We want to get bytes out of L0 which is the highest level
// - There could be deletes sitting in higher levels that are better
// to apply first.
// This is generalized as restoring the LSM to health one level at
// a time, starting from the highest level. In practice, if a
// compaction can't be picked from a higher level, we'll pick one
// from a lower level.
// - The remaining are low priority.
//
// We typically observed both L0 and Lbase being categorized into high
// priority (this was the large TPCC import with a compaction concurrency of 3).
// If L0=>Lbase was frequently able to run 3 concurrent compactions,
// eventually Lbase would fall behind enough to become highest priority
// and use up some of the compaction slots.

// This is a different notion of priority than the one described above. It is
// a subdivision within the low priority compactions to reduce compaction
// concurrency -- see the code below.
const highPriorityThreshold = 1.5

p.initSizeAdjust(env.inProgressCompactions)
p.initScores(env.inProgressCompactions)

// Check for a score-based compaction. "scores" has been sorted in order of
// decreasing score. For each level with a score >= 1, we attempt to find a
// compaction anchored at at that level.
l0Score := 0.0
var highestPriority []*pickedCompactionInfo
var highPriority []*pickedCompactionInfo
var lowPriority []*pickedCompactionInfo
for i := range p.scores {
info := &p.scores[i]
if len(env.inProgressCompactions) > 0 && info.score < highPriorityThreshold {
// Don't start a low priority compaction if there is already a compaction
// running.
return nil
if info.level == 0 {
l0Score = info.score
}
if info.score < 1 {
break
if info.score > 4 {
// The info.score > 50 heuristic is based on noticing that the
// byte ratios are only an aggregate across the whole key space.
// And when Lbase falls too far behind then the number of observed
// input bytes from Lbase for L0 => Lbase becomes unproductive.
// For example 12MB from L0 + 1GB from Lbase => Lbase.
if p.currentByteRatios[info.level] > 5 || info.score > 50 {
highestPriority = append(highestPriority, info)
} else {
highPriority = append(highPriority, info)
}
} else {
lowPriority = append(lowPriority, info)
}
}
// If L0 has high score consider intra-L0. This number is a rough guess
// based on looking at one early TPCC import experiment. Other improvements
// subsequent to that kept the L0 score < 20, so one could possibly reduce
// this threshold.
considerIntraL0 := l0Score > 50

info.file = p.pickFile(info.level)
if info.file == -1 {
continue
}
p.opts.Logger.Infof(
"pickAuto: in-progress: %d, max: %d, L0 compactions: base: %d, intra-L0: %d, l0score: %f, highest-score,level: %f, %d, lengths: %d, %d, %d",
len(env.inProgressCompactions), p.opts.MaxConcurrentCompactions, countBaseCompactions,
countIntraL0Compactions, l0Score, p.scores[0].score, p.scores[0].level,
len(highestPriority), len(highPriority), len(lowPriority))
sort.Sort(sortCompactionLevels(highPriority))

c := pickAutoHelper(env, p.opts, p.vers, *info, p.baseLevel)
// Fail-safe to protect against compacting the same sstable concurrently.
if c != nil && !inputAlreadyCompacting(c) {
c.score = info.score
return c
// Check for a score-based compaction. "scores" has been sorted in order of
// decreasing score. For each level with a score >= 1, we attempt to find a
// compaction anchored at at that level.
for _, comList := range [3][]*pickedCompactionInfo{highestPriority, highPriority, lowPriority} {
for _, info := range comList {
if len(env.inProgressCompactions) > 0 && info.score < highPriorityThreshold {
// Don't start a low priority compaction if there is already a compaction
// running.
return nil
}
if info.score < 1 {
break
}

if info.level == 0 && p.vers.L0SubLevels != nil {
// It is important to pass information about Lbase files to L0SubLevels
// so it can pick a compaction that does not conflict with an Lbase => Lbase+1
// compaction. Without this, we observed reduced concurrency of L0=>Lbase
// compactions, and increasing read amplification in L0.\
lcf, err := p.vers.L0SubLevels.PickBaseCompaction(
p.opts.L0CompactionThreshold, p.vers.Files[p.baseLevel])
if err != nil {
p.opts.Logger.Infof("error when picking base compaction: %s", err.Error())
continue
} else if lcf != nil {
c = newCompaction(p.opts, p.vers, 0, p.baseLevel, env.bytesCompacted)
if c.outputLevel != p.baseLevel {
panic("pebble: compaction picked unexpected output level")
}
c.inputs[0] = make([]*manifest.FileMetadata, 0, len(lcf.Files))
for j := range lcf.FilesIncluded {
if lcf.FilesIncluded[j] {
c.inputs[0] = append(c.inputs[0], p.vers.Files[0][j])
}
}
if len(c.inputs[0]) == 0 {
panic("pebble: empty compaction")
}
c.setupInputsForAutoL0ToBase(p.vers.L0SubLevels, lcf)
if len(c.inputs[0]) == 0 {
panic("pebble: empty compaction")
}
// Fail-safe to protect against compacting the same sstable concurrently.
if inputAlreadyCompacting(c) {
// PickBaseCompaction should have already prevented this except for
// atomic compaction groups.
p.opts.Logger.Infof("pickAuto: failed to pick L0=>Lbase due to Lbase")
continue
}
c.score = info.score
return c
}
if considerIntraL0 {
lcf, err = p.vers.L0SubLevels.PickIntraL0Compaction(env.earliestUnflushedSeqNum, minIntraL0Count)
if err != nil {
p.opts.Logger.Infof("error when picking Intra-L0 compaction: %s", err.Error())
continue
} else if lcf != nil {
// TODO: derive from pickIntraL0 instead of pickAutoHelper. Specifically, cannot
// call setupInputs().
c = newCompaction(p.opts, p.vers, 0, 0, env.bytesCompacted)
c.inputs[0] = make([]*manifest.FileMetadata, 0, len(lcf.Files))
for j := range lcf.FilesIncluded {
if lcf.FilesIncluded[j] {
c.inputs[0] = append(c.inputs[0], p.vers.Files[0][j])
}
}
if len(c.inputs[0]) == 0 {
panic("pebble: empty compaction")
}
c.smallest, c.largest = manifest.KeyRange(c.cmp, c.inputs[0], nil)
c.setupInuseKeyRanges()
// Output only a single sstable for intra-L0 compactions.
// Now that we have the ability to split flushes, we could conceivably
// split the output of intra-L0 compactions too. This may be unnecessary
// complexity -- the inputs to intra-L0 should be narrow in the key space
// (unlike flushes), so writing a single sstable should be ok.
c.maxOutputFileSize = math.MaxUint64
c.maxOverlapBytes = math.MaxUint64
c.maxExpandedBytes = math.MaxUint64
return c
}
}
continue
}
info.file = p.pickFile(info.level)
if info.file == -1 {
continue
}

c := pickAutoHelper(env, p.opts, p.vers, *info, p.baseLevel)
// Fail-safe to protect against compacting the same sstable concurrently.
if c != nil && !inputAlreadyCompacting(c) {
c.score = info.score
return c
}
}
}

Expand Down
6 changes: 3 additions & 3 deletions compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -864,9 +864,9 @@ func TestCompaction(t *testing.T) {
// The next addition creates the fourth level-0 table, and l0CompactionTrigger == 4,
// so this triggers a non-trivial compaction into one level-1 table. Note that the
// keys in this one larger table are interleaved from the four smaller ones.
{"+E", "E", "ABCDbd."},
{"+e", "Ee", "ABCDbd."},
{"+F", "F", "ABCDbd.Ee."},
{"+E", "E", "Aa.BC.Bb.Dad."},
{"+e", "Ee", "Aa.BC.Bb.Dad."},
{"+F", "F", "ABCDEbde."},
}
for _, tc := range testCases {
if key := tc.key[1:]; tc.key[0] == '+' {
Expand Down
2 changes: 1 addition & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1234,7 +1234,7 @@ func (d *DB) makeRoomForWrite(b *Batch) error {
continue
}
}
if len(d.mu.versions.currentVersion().Files[0]) >= d.opts.L0StopWritesThreshold {
if d.mu.versions.currentVersion().L0SubLevels.ReadAmplification() >= d.opts.L0StopWritesThreshold {
// There are too many level-0 files, so we wait.
if !stalled {
stalled = true
Expand Down
2 changes: 1 addition & 1 deletion get_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ func TestGetIter(t *testing.T) {
v.Files[tt.level] = append(v.Files[tt.level], meta)
}

if err := v.InitL0Sublevels(cmp, base.DefaultFormatter); err != nil {
if err := v.InitL0Sublevels(cmp, base.DefaultFormatter, 10 << 20); err != nil {
t.Fatalf("desc=%q: internal error: %s", desc, err.Error())
}
err := v.CheckOrdering(cmp, base.DefaultFormatter)
Expand Down
Loading