Skip to content

Commit

Permalink
db: add migration to rewrite atomic compaction units
Browse files Browse the repository at this point in the history
Add a new format major version FormatSplitUserKeys that guarantees all sstables
within the table form their own atomic compaction unit. Previous versions of
Pebble (before #1470) and RocksDB allowed keys with identical user keys to be
split across multiple tables within a level. This produced a set of files known
as an 'atomic compaction unit' that must be compacted together to preserve the
LSM invariant.

This change adds a new format major version that can't be committed until all
such atomic compaction units have been recompacted to ensure that within levels
1-6, all versions of the same user key are within the same sstable. When
ratcheting the format major version to  FormatSplitUserKeys, Pebble will look
for atomic compaction units and schedule compactions, one-at-a-time, to rewrite
these multi-file atomic compaction units into multiple single-file atomic
compaction units.

Before this new format major version is committed, Pebble will also
opportunistically schedule compactions to compact these files in the background
when no other compactions may be scheduled. A new field on `pebble.Metrics`
exposes the count of such files, populated at Open and whenever one of these
compactions is attempted.

This migration will allow future versions of Pebble to remove code necessary to
handle these atomic compaction units.

Close #1495.
  • Loading branch information
jbowens committed Feb 28, 2022
1 parent 0e0d279 commit e4bbcf2
Show file tree
Hide file tree
Showing 22 changed files with 532 additions and 65 deletions.
22 changes: 10 additions & 12 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ const (
compactionKindDeleteOnly
compactionKindElisionOnly
compactionKindRead
compactionKindRewrite
)

func (k compactionKind) String() string {
Expand All @@ -301,6 +302,8 @@ func (k compactionKind) String() string {
return "elision-only"
case compactionKindRead:
return "read"
case compactionKindRewrite:
return "rewrite"
}
return "?"
}
Expand All @@ -320,9 +323,9 @@ type compaction struct {
// startLevel is the level that is being compacted. Inputs from startLevel
// and outputLevel will be merged to produce a set of outputLevel files.
startLevel *compactionLevel
// outputLevel is the level that files are being produced in. outputLevel is
// equal to startLevel+1 except when startLevel is 0 in which case it is
// equal to compactionPicker.baseLevel().
// outputLevel is the level that files are being produced in. For default
// compactions, outputLevel is equal to startLevel+1 except when startLevel
// is 0 in which case it is equal to compactionPicker.baseLevel().
outputLevel *compactionLevel

inputs []compactionLevel
Expand Down Expand Up @@ -448,15 +451,9 @@ func newCompaction(pc *pickedCompaction, opts *Options, bytesCompacted *uint64)
}
c.setupInuseKeyRanges()

switch {
case pc.readTriggered:
c.kind = compactionKindRead
case c.startLevel.level == numLevels-1:
// This compaction is an L6->L6 elision-only compaction to rewrite
// a sstable without unnecessary tombstones.
c.kind = compactionKindElisionOnly
case c.outputLevel.files.Empty() && c.startLevel.files.Len() == 1 &&
c.grandparents.SizeSum() <= c.maxOverlapBytes:
c.kind = pc.kind
if c.kind == compactionKindDefault && c.outputLevel.files.Empty() &&
c.startLevel.files.Len() == 1 && c.grandparents.SizeSum() <= c.maxOverlapBytes {
// This compaction can be converted into a trivial move from one level
// to the next. We avoid such a move if there is lots of overlapping
// grandparent data. Otherwise, the move could create a parent file
Expand Down Expand Up @@ -1548,6 +1545,7 @@ func (d *DB) maybeScheduleCompactionPicker(
}

env := compactionEnv{
nonatomicFileCount: &d.mu.compact.nonatomicFileCount,
bytesCompacted: &d.atomic.bytesCompacted,
earliestSnapshotSeqNum: d.mu.snapshots.earliest(),
earliestUnflushedSeqNum: d.getEarliestUnflushedSeqNumLocked(),
Expand Down
134 changes: 115 additions & 19 deletions compaction_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ type compactionEnv struct {
earliestSnapshotSeqNum uint64
inProgressCompactions []compactionInfo
readCompactionEnv readCompactionEnv

// nonatomicFileCount is protected by DB.mu. It holds the most-recently
// calculated count of files that are not their own atomic compaction unit,
// as calculated by findSplitUserKey. It's a pointer so that the compaction
// picker may update the count when it performs a scan of the LSM looking
// for such a file.
nonatomicFileCount *int
}

type compactionPicker interface {
Expand All @@ -37,6 +44,7 @@ type compactionPicker interface {
pickAuto(env compactionEnv) (pc *pickedCompaction)
pickManual(env compactionEnv, manual *manualCompaction) (c *pickedCompaction, retryLater bool)
pickElisionOnlyCompaction(env compactionEnv) (pc *pickedCompaction)
pickRewriteCompaction(env compactionEnv, level int, file manifest.LevelFile) (pc *pickedCompaction)
pickReadTriggeredCompaction(env compactionEnv) (pc *pickedCompaction)
forceBaseLevel1()
}
Expand Down Expand Up @@ -103,15 +111,15 @@ type pickedCompaction struct {
// score of the chosen compaction. Taken from candidateLevelInfo.
score float64

// readTrigger is true if the compaction was triggered due to reads.
readTriggered bool
// kind indicates the kind of compaction.
kind compactionKind

// startLevel is the level that is being compacted. Inputs from startLevel
// and outputLevel will be merged to produce a set of outputLevel files.
startLevel *compactionLevel
// outputLevel is the level that files are being produced in. outputLevel is
// equal to startLevel+1 except when startLevel is 0 in which case it is
// equal to compactionPicker.baseLevel().
// outputLevel is the level that files are being produced in. In default
// compactions, outputLevel is equal to startLevel+1 except when startLevel
// is 0 in which case it is equal to compactionPicker.baseLevel().
outputLevel *compactionLevel
// adjustedOutputLevel is the output level used for the purpose of
// determining the target output file size, overlap bytes, and expanded
Expand Down Expand Up @@ -142,19 +150,23 @@ type pickedCompaction struct {
version *version
}

func newPickedCompaction(opts *Options, cur *version, startLevel, baseLevel int) *pickedCompaction {
if startLevel > 0 && startLevel < baseLevel {
panic(fmt.Sprintf("invalid compaction: start level %d should not be empty (base level %d)",
startLevel, baseLevel))
}

func defaultOutputLevel(startLevel, baseLevel int) int {
outputLevel := startLevel + 1
if startLevel == 0 {
outputLevel = baseLevel
}
if outputLevel >= numLevels-1 {
outputLevel = numLevels - 1
}
return outputLevel
}

func newPickedCompaction(opts *Options, cur *version, startLevel, outputLevel, baseLevel int) *pickedCompaction {
if startLevel > 0 && startLevel < baseLevel {
panic(fmt.Sprintf("invalid compaction: start level %d should not be empty (base level %d)",
startLevel, baseLevel))
}

// Output level is in the range [baseLevel,numLevels]. For the purpose of
// determining the target output file size, overlap bytes, and expanded
// bytes, we want to adjust the range to [1,numLevels].
Expand All @@ -177,7 +189,7 @@ func newPickedCompaction(opts *Options, cur *version, startLevel, baseLevel int)
func newPickedCompactionFromL0(
lcf *manifest.L0CompactionFiles, opts *Options, vers *version, baseLevel int, isBase bool,
) *pickedCompaction {
pc := newPickedCompaction(opts, vers, 0, baseLevel)
pc := newPickedCompaction(opts, vers, 0, baseLevel, baseLevel)
pc.lcf = lcf
if !isBase {
pc.outputLevel.level = 0
Expand Down Expand Up @@ -1053,6 +1065,29 @@ func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompact
*env.readCompactionEnv.rescheduleReadCompaction = true
}

// At the lowest possible compaction-picking priority, look for atomic
// compaction units that span multiple files. While current Pebble code does
// not construct such sstables, RocksDB and earlier versions of Pebble may
// have created them. These split user keys form sets of files that must be
// compacted together for correctness (referred to as "atomic compaction
// units" within the code). Rewrite them in-place.
if *env.nonatomicFileCount > 0 {
count, level, file, ok := findSplitUserKey(p.opts, p.vers)
// Save the newly established count so that it may be reported db
// DB.Metrics, and so a subsequent pickAuto doesn't need to scan the LSM
// if there no longer are any such files. NB: It's okay to mutate
// nonatomicFileCount because compaction-picking is performed while
// holding DB.mu.
*env.nonatomicFileCount = count
if !ok {
// There are no multi-file atomic compaction units in the database.
return nil
}
if pc := p.pickRewriteCompaction(env, level, file); pc != nil {
return pc
}
}

return nil
}

Expand Down Expand Up @@ -1136,13 +1171,43 @@ func (p *compactionPickerByScore) pickElisionOnlyCompaction(

// Construct a picked compaction of the elision candidate's atomic
// compaction unit.
pc = newPickedCompaction(p.opts, p.vers, numLevels-1, p.baseLevel)
pc = newPickedCompaction(p.opts, p.vers, numLevels-1, numLevels-1, p.baseLevel)
var isCompacting bool
pc.startLevel.files, isCompacting = expandToAtomicUnit(p.opts.Comparer.Compare, lf.Slice(), false /* disableIsCompacting */)
if isCompacting {
return nil
}
pc.kind = compactionKindElisionOnly
pc.smallest, pc.largest = manifest.KeyRange(pc.cmp, pc.startLevel.files.Iter())
// Fail-safe to protect against compacting the same sstable concurrently.
if !inputRangeAlreadyCompacting(env, pc) {
return pc
}
return nil
}

// pickRewriteCompaction attempts to construct a compaction that rewrites the
// provided file in the provided level. pickRewriteCompaction will pull in
// adjacent files in the file's atomic compaction unit if necessary. A rewrite
// compaction outputs files to the same level as the input level.
func (p *compactionPickerByScore) pickRewriteCompaction(env compactionEnv, level int, file manifest.LevelFile) (pc *pickedCompaction) {
// Find this file's atomic compaction unit.
atomicUnit, isCompacting := expandToAtomicUnit(
p.opts.Comparer.Compare,
file.Slice(),
false, /* disableIsCompacting */
)
if isCompacting {
return nil
}

pc = newPickedCompaction(p.opts, p.vers, level, level, p.baseLevel)
pc.outputLevel.level = level
pc.adjustedOutputLevel = 1 + level - p.baseLevel
pc.kind = compactionKindRewrite
pc.startLevel.files = atomicUnit
pc.smallest, pc.largest = manifest.KeyRange(pc.cmp, pc.startLevel.files.Iter())

// Fail-safe to protect against compacting the same sstable concurrently.
if !inputRangeAlreadyCompacting(env, pc) {
return pc
Expand All @@ -1162,7 +1227,8 @@ func pickAutoHelper(
return pickIntraL0(env, opts, vers)
}

pc = newPickedCompaction(opts, vers, cInfo.level, baseLevel)
outputLevel := defaultOutputLevel(cInfo.level, baseLevel)
pc = newPickedCompaction(opts, vers, cInfo.level, outputLevel, baseLevel)
if pc.outputLevel.level != cInfo.outputLevel {
panic("pebble: compaction picked unexpected output level")
}
Expand Down Expand Up @@ -1325,8 +1391,7 @@ func pickIntraL0(env compactionEnv, opts *Options, vers *version) (pc *pickedCom
if compactTotalCount < minIntraL0Count {
return nil
}

pc = newPickedCompaction(opts, vers, 0, 0)
pc = newPickedCompaction(opts, vers, 0, 0, 0)
pc.startLevel.files = compactFiles
pc.smallest, pc.largest = manifest.KeyRange(pc.cmp, compactFiles.Iter())
// Output only a single sstable for intra-L0 compactions. There is no current
Expand Down Expand Up @@ -1383,7 +1448,8 @@ func pickManualHelper(
baseLevel int,
diskAvailBytes func() uint64,
) (pc *pickedCompaction) {
pc = newPickedCompaction(opts, vers, manual.level, baseLevel)
outputLevel := defaultOutputLevel(manual.level, baseLevel)
pc = newPickedCompaction(opts, vers, manual.level, outputLevel, baseLevel)
manual.outputLevel = pc.outputLevel.level
cmp := opts.Comparer.Compare
pc.startLevel.files = vers.Overlaps(manual.level, cmp, manual.start.UserKey,
Expand Down Expand Up @@ -1439,7 +1505,8 @@ func pickReadTriggeredCompactionHelper(
return nil
}

pc = newPickedCompaction(p.opts, p.vers, rc.level, p.baseLevel)
outputLevel := defaultOutputLevel(rc.level, p.baseLevel)
pc = newPickedCompaction(p.opts, p.vers, rc.level, outputLevel, p.baseLevel)

pc.startLevel.files = overlapSlice
if !pc.setupInputs(p.opts, p.diskAvailBytes()) {
Expand All @@ -1448,7 +1515,7 @@ func pickReadTriggeredCompactionHelper(
if inputRangeAlreadyCompacting(env, pc) {
return nil
}
pc.readTriggered = true
pc.kind = compactionKindRead

// Prevent read compactions which are too wide.
outputOverlaps := pc.version.Overlaps(
Expand Down Expand Up @@ -1544,3 +1611,32 @@ func conflictsWithInProgress(
}
return false
}

// findSplitUserKey scans the LSM's levels 1 through 6 for adjacent files that
// contain the same user key. Such arrangements of files were permitted in
// RocksDB and in Pebble up to SHA a860bbad. findSplitUserKey returns the count
// of such files (may double-count an atomic compaction unit) and it returns the
// level and LevelFile for one such file.
func findSplitUserKey(opts *Options, vers *version) (count, level int, file manifest.LevelFile, ok bool) {
equal := opts.equal()
for l := numLevels - 1; l > 0; l-- {
iter := vers.Levels[l].Iter()

var userKey []byte
for f := iter.First(); f != nil; f = iter.Next() {
if userKey != nil && equal(userKey, f.Smallest.UserKey) {
if !ok {
// First eligible file found.
level, file, ok = l, iter.Take(), true
}
count++
}
if f.Largest.IsExclusiveSentinel() {
userKey = nil
} else {
userKey = f.Largest.UserKey
}
}
}
return count, level, file, ok
}
6 changes: 6 additions & 0 deletions compaction_picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ func TestCompactionPickerTargetLevel(t *testing.T) {
var inProgress []compactionInfo
for {
env := compactionEnv{
nonatomicFileCount: new(int),
earliestUnflushedSeqNum: InternalKeySeqNumMax,
inProgressCompactions: inProgress,
}
Expand Down Expand Up @@ -268,6 +269,7 @@ func TestCompactionPickerTargetLevel(t *testing.T) {
}

pc := pickerByScore.pickAuto(compactionEnv{
nonatomicFileCount: new(int),
earliestUnflushedSeqNum: InternalKeySeqNumMax,
inProgressCompactions: inProgress,
})
Expand Down Expand Up @@ -610,6 +612,7 @@ func TestCompactionPickerL0(t *testing.T) {
}

pc = picker.pickAuto(compactionEnv{
nonatomicFileCount: new(int),
bytesCompacted: new(uint64),
earliestUnflushedSeqNum: math.MaxUint64,
inProgressCompactions: inProgressCompactions,
Expand Down Expand Up @@ -833,6 +836,7 @@ func TestCompactionPickerConcurrency(t *testing.T) {
}

pc := picker.pickAuto(compactionEnv{
nonatomicFileCount: new(int),
bytesCompacted: new(uint64),
earliestUnflushedSeqNum: math.MaxUint64,
inProgressCompactions: inProgressCompactions,
Expand Down Expand Up @@ -982,6 +986,7 @@ func TestCompactionPickerPickReadTriggered(t *testing.T) {

case "pick-auto":
pc := picker.pickAuto(compactionEnv{
nonatomicFileCount: new(int),
bytesCompacted: new(uint64),
earliestUnflushedSeqNum: math.MaxUint64,
readCompactionEnv: readCompactionEnv{
Expand Down Expand Up @@ -1293,6 +1298,7 @@ func TestCompactionOutputFileSize(t *testing.T) {

case "pick-auto":
pc := picker.pickAuto(compactionEnv{
nonatomicFileCount: new(int),
bytesCompacted: new(uint64),
earliestUnflushedSeqNum: math.MaxUint64,
earliestSnapshotSeqNum: math.MaxUint64,
Expand Down
8 changes: 7 additions & 1 deletion compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ func (p *compactionPickerForTesting) pickElisionOnlyCompaction(
return nil
}

func (p *compactionPickerForTesting) pickRewriteCompaction(
env compactionEnv, level int, file manifest.LevelFile,
) (pc *pickedCompaction) {
return nil
}

func (p *compactionPickerForTesting) pickManual(
env compactionEnv, manual *manualCompaction,
) (pc *pickedCompaction, retryLater bool) {
Expand Down Expand Up @@ -1566,7 +1572,7 @@ func TestCompactionOutputLevel(t *testing.T) {
var start, base int
d.ScanArgs(t, "start", &start)
d.ScanArgs(t, "base", &base)
pc := newPickedCompaction(opts, version, start, base)
pc := newPickedCompaction(opts, version, start, defaultOutputLevel(start, base), base)
c := newCompaction(pc, opts, new(uint64))
return fmt.Sprintf("output=%d\nmax-output-file-size=%d\n",
c.outputLevel.level, c.maxOutputFileSize)
Expand Down
Loading

0 comments on commit e4bbcf2

Please sign in to comment.