Skip to content

Commit

Permalink
db: add migration to rewrite atomic compaction units
Browse files Browse the repository at this point in the history
Add a new format major version FormatSplitUserKeys that guarantees all sstables
within the table form their own atomic compaction unit. Previous versions of
Pebble (before #1470) and RocksDB allowed keys with identical user keys to be
split across multiple tables within a level. This produced a set of files known
as an 'atomic compaction unit' that must be compacted together to preserve the
LSM invariant.

This change adds a new format major version that can't be committed until all
such atomic compaction units have been recompacted to ensure that within levels
1-6, all versions of the same user key are within the same sstable. When
ratcheting the format major version to  FormatSplitUserKeys, Pebble will look
for atomic compaction units and schedule compactions, one-at-a-time, to rewrite
these multi-file atomic compaction units into multiple single-file atomic
compaction units.

This migration will allow future versions of Pebble to remove code necessary to
handle these atomic compaction units.

Close #1495.
  • Loading branch information
jbowens committed Feb 28, 2022
1 parent 0e0d279 commit fb3e90d
Show file tree
Hide file tree
Showing 19 changed files with 429 additions and 66 deletions.
21 changes: 9 additions & 12 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ const (
compactionKindDeleteOnly
compactionKindElisionOnly
compactionKindRead
compactionKindRewrite
)

func (k compactionKind) String() string {
Expand All @@ -301,6 +302,8 @@ func (k compactionKind) String() string {
return "elision-only"
case compactionKindRead:
return "read"
case compactionKindRewrite:
return "rewrite"
}
return "?"
}
Expand All @@ -320,9 +323,9 @@ type compaction struct {
// startLevel is the level that is being compacted. Inputs from startLevel
// and outputLevel will be merged to produce a set of outputLevel files.
startLevel *compactionLevel
// outputLevel is the level that files are being produced in. outputLevel is
// equal to startLevel+1 except when startLevel is 0 in which case it is
// equal to compactionPicker.baseLevel().
// outputLevel is the level that files are being produced in. For default
// compactions, outputLevel is equal to startLevel+1 except when startLevel
// is 0 in which case it is equal to compactionPicker.baseLevel().
outputLevel *compactionLevel

inputs []compactionLevel
Expand Down Expand Up @@ -448,15 +451,9 @@ func newCompaction(pc *pickedCompaction, opts *Options, bytesCompacted *uint64)
}
c.setupInuseKeyRanges()

switch {
case pc.readTriggered:
c.kind = compactionKindRead
case c.startLevel.level == numLevels-1:
// This compaction is an L6->L6 elision-only compaction to rewrite
// a sstable without unnecessary tombstones.
c.kind = compactionKindElisionOnly
case c.outputLevel.files.Empty() && c.startLevel.files.Len() == 1 &&
c.grandparents.SizeSum() <= c.maxOverlapBytes:
c.kind = pc.kind
if c.kind == compactionKindDefault && c.outputLevel.files.Empty() &&
c.startLevel.files.Len() == 1 && c.grandparents.SizeSum() <= c.maxOverlapBytes {
// This compaction can be converted into a trivial move from one level
// to the next. We avoid such a move if there is lots of overlapping
// grandparent data. Otherwise, the move could create a parent file
Expand Down
76 changes: 56 additions & 20 deletions compaction_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type compactionPicker interface {
pickAuto(env compactionEnv) (pc *pickedCompaction)
pickManual(env compactionEnv, manual *manualCompaction) (c *pickedCompaction, retryLater bool)
pickElisionOnlyCompaction(env compactionEnv) (pc *pickedCompaction)
pickRewriteCompaction(env compactionEnv, level int, file manifest.LevelFile) (pc *pickedCompaction)
pickReadTriggeredCompaction(env compactionEnv) (pc *pickedCompaction)
forceBaseLevel1()
}
Expand Down Expand Up @@ -103,15 +104,15 @@ type pickedCompaction struct {
// score of the chosen compaction. Taken from candidateLevelInfo.
score float64

// readTrigger is true if the compaction was triggered due to reads.
readTriggered bool
// kind indicates the kind of compaction.
kind compactionKind

// startLevel is the level that is being compacted. Inputs from startLevel
// and outputLevel will be merged to produce a set of outputLevel files.
startLevel *compactionLevel
// outputLevel is the level that files are being produced in. outputLevel is
// equal to startLevel+1 except when startLevel is 0 in which case it is
// equal to compactionPicker.baseLevel().
// outputLevel is the level that files are being produced in. In default
// compactions, outputLevel is equal to startLevel+1 except when startLevel
// is 0 in which case it is equal to compactionPicker.baseLevel().
outputLevel *compactionLevel
// adjustedOutputLevel is the output level used for the purpose of
// determining the target output file size, overlap bytes, and expanded
Expand Down Expand Up @@ -142,19 +143,23 @@ type pickedCompaction struct {
version *version
}

func newPickedCompaction(opts *Options, cur *version, startLevel, baseLevel int) *pickedCompaction {
if startLevel > 0 && startLevel < baseLevel {
panic(fmt.Sprintf("invalid compaction: start level %d should not be empty (base level %d)",
startLevel, baseLevel))
}

func defaultOutputLevel(startLevel, baseLevel int) int {
outputLevel := startLevel + 1
if startLevel == 0 {
outputLevel = baseLevel
}
if outputLevel >= numLevels-1 {
outputLevel = numLevels - 1
}
return outputLevel
}

func newPickedCompaction(opts *Options, cur *version, startLevel, outputLevel, baseLevel int) *pickedCompaction {
if startLevel > 0 && startLevel < baseLevel {
panic(fmt.Sprintf("invalid compaction: start level %d should not be empty (base level %d)",
startLevel, baseLevel))
}

// Output level is in the range [baseLevel,numLevels]. For the purpose of
// determining the target output file size, overlap bytes, and expanded
// bytes, we want to adjust the range to [1,numLevels].
Expand All @@ -177,7 +182,7 @@ func newPickedCompaction(opts *Options, cur *version, startLevel, baseLevel int)
func newPickedCompactionFromL0(
lcf *manifest.L0CompactionFiles, opts *Options, vers *version, baseLevel int, isBase bool,
) *pickedCompaction {
pc := newPickedCompaction(opts, vers, 0, baseLevel)
pc := newPickedCompaction(opts, vers, 0, baseLevel, baseLevel)
pc.lcf = lcf
if !isBase {
pc.outputLevel.level = 0
Expand Down Expand Up @@ -1052,7 +1057,6 @@ func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompact
if env.readCompactionEnv.rescheduleReadCompaction != nil {
*env.readCompactionEnv.rescheduleReadCompaction = true
}

return nil
}

Expand Down Expand Up @@ -1136,12 +1140,13 @@ func (p *compactionPickerByScore) pickElisionOnlyCompaction(

// Construct a picked compaction of the elision candidate's atomic
// compaction unit.
pc = newPickedCompaction(p.opts, p.vers, numLevels-1, p.baseLevel)
pc = newPickedCompaction(p.opts, p.vers, numLevels-1, numLevels-1, p.baseLevel)
var isCompacting bool
pc.startLevel.files, isCompacting = expandToAtomicUnit(p.opts.Comparer.Compare, lf.Slice(), false /* disableIsCompacting */)
if isCompacting {
return nil
}
pc.kind = compactionKindElisionOnly
pc.smallest, pc.largest = manifest.KeyRange(pc.cmp, pc.startLevel.files.Iter())
// Fail-safe to protect against compacting the same sstable concurrently.
if !inputRangeAlreadyCompacting(env, pc) {
Expand All @@ -1150,6 +1155,35 @@ func (p *compactionPickerByScore) pickElisionOnlyCompaction(
return nil
}

// pickRewriteCompaction attempts to construct a compaction that rewrites the
// provided file in the provided level. pickRewriteCompaction will pull in
// adjacent files in the file's atomic compaction unit if necessary. A rewrite
// compaction outputs files to the same level as the input level.
func (p *compactionPickerByScore) pickRewriteCompaction(env compactionEnv, level int, file manifest.LevelFile) (pc *pickedCompaction) {
// Find this file's atomic compaction unit.
atomicUnit, isCompacting := expandToAtomicUnit(
p.opts.Comparer.Compare,
file.Slice(),
false, /* disableIsCompacting */
)
if isCompacting {
return nil
}

pc = newPickedCompaction(p.opts, p.vers, level, level, p.baseLevel)
pc.outputLevel.level = level
pc.adjustedOutputLevel = 1 + level - p.baseLevel
pc.kind = compactionKindRewrite
pc.startLevel.files = atomicUnit
pc.smallest, pc.largest = manifest.KeyRange(pc.cmp, pc.startLevel.files.Iter())

// Fail-safe to protect against compacting the same sstable concurrently.
if !inputRangeAlreadyCompacting(env, pc) {
return pc
}
return nil
}

func pickAutoHelper(
env compactionEnv,
opts *Options,
Expand All @@ -1162,7 +1196,8 @@ func pickAutoHelper(
return pickIntraL0(env, opts, vers)
}

pc = newPickedCompaction(opts, vers, cInfo.level, baseLevel)
outputLevel := defaultOutputLevel(cInfo.level, baseLevel)
pc = newPickedCompaction(opts, vers, cInfo.level, outputLevel, baseLevel)
if pc.outputLevel.level != cInfo.outputLevel {
panic("pebble: compaction picked unexpected output level")
}
Expand Down Expand Up @@ -1325,8 +1360,7 @@ func pickIntraL0(env compactionEnv, opts *Options, vers *version) (pc *pickedCom
if compactTotalCount < minIntraL0Count {
return nil
}

pc = newPickedCompaction(opts, vers, 0, 0)
pc = newPickedCompaction(opts, vers, 0, 0, 0)
pc.startLevel.files = compactFiles
pc.smallest, pc.largest = manifest.KeyRange(pc.cmp, compactFiles.Iter())
// Output only a single sstable for intra-L0 compactions. There is no current
Expand Down Expand Up @@ -1383,7 +1417,8 @@ func pickManualHelper(
baseLevel int,
diskAvailBytes func() uint64,
) (pc *pickedCompaction) {
pc = newPickedCompaction(opts, vers, manual.level, baseLevel)
outputLevel := defaultOutputLevel(manual.level, baseLevel)
pc = newPickedCompaction(opts, vers, manual.level, outputLevel, baseLevel)
manual.outputLevel = pc.outputLevel.level
cmp := opts.Comparer.Compare
pc.startLevel.files = vers.Overlaps(manual.level, cmp, manual.start.UserKey,
Expand Down Expand Up @@ -1439,7 +1474,8 @@ func pickReadTriggeredCompactionHelper(
return nil
}

pc = newPickedCompaction(p.opts, p.vers, rc.level, p.baseLevel)
outputLevel := defaultOutputLevel(rc.level, p.baseLevel)
pc = newPickedCompaction(p.opts, p.vers, rc.level, outputLevel, p.baseLevel)

pc.startLevel.files = overlapSlice
if !pc.setupInputs(p.opts, p.diskAvailBytes()) {
Expand All @@ -1448,7 +1484,7 @@ func pickReadTriggeredCompactionHelper(
if inputRangeAlreadyCompacting(env, pc) {
return nil
}
pc.readTriggered = true
pc.kind = compactionKindRead

// Prevent read compactions which are too wide.
outputOverlaps := pc.version.Overlaps(
Expand Down
8 changes: 7 additions & 1 deletion compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ func (p *compactionPickerForTesting) pickElisionOnlyCompaction(
return nil
}

func (p *compactionPickerForTesting) pickRewriteCompaction(
env compactionEnv, level int, file manifest.LevelFile,
) (pc *pickedCompaction) {
return nil
}

func (p *compactionPickerForTesting) pickManual(
env compactionEnv, manual *manualCompaction,
) (pc *pickedCompaction, retryLater bool) {
Expand Down Expand Up @@ -1566,7 +1572,7 @@ func TestCompactionOutputLevel(t *testing.T) {
var start, base int
d.ScanArgs(t, "start", &start)
d.ScanArgs(t, "base", &base)
pc := newPickedCompaction(opts, version, start, base)
pc := newPickedCompaction(opts, version, start, defaultOutputLevel(start, base), base)
c := newCompaction(pc, opts, new(uint64))
return fmt.Sprintf("output=%d\nmax-output-file-size=%d\n",
c.outputLevel.level, c.maxOutputFileSize)
Expand Down
28 changes: 28 additions & 0 deletions data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,34 @@ func runIngestCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error {
return nil
}

func runForceIngestCmd(td *datadriven.TestData, d *DB) error {
var paths []string
var level int
for _, arg := range td.CmdArgs {
switch arg.Key {
case "paths":
paths = append(paths, arg.Vals...)
case "level":
var err error
level, err = strconv.Atoi(arg.Vals[0])
if err != nil {
return err
}
}
}
return d.ingest(paths, func(
tableNewIters,
IterOptions,
Compare,
*version,
int,
map[*compaction]struct{},
*fileMetadata,
) (int, error) {
return level, nil
})
}

func runLSMCmd(td *datadriven.TestData, d *DB) string {
d.mu.Lock()
s := d.mu.versions.currentVersion().DebugString(base.DefaultFormatter)
Expand Down
Loading

0 comments on commit fb3e90d

Please sign in to comment.