Skip to content

Commit

Permalink
db: add migration to rewrite atomic compaction units
Browse files Browse the repository at this point in the history
Add two new format major versions FormatSplitUserKeysMarked and
FormatMarkCompacted that together may be used to guarantee that all sstables
within the table form their own atomic compaction unit. Previous versions of
Pebble (before #1470) and RocksDB allowed keys with identical user keys to be
split across multiple tables within a level. This produced a set of files known
as an 'atomic compaction unit' that must be compacted together to preserve the
LSM invariant.

The first format major version FormatSplitUserKeysMarked may be used to
guarantee that every file that is member of a multi-file atomic compaction unit
is marked for compaction. The 'marked-for-compaction' file metadata field is
currently unused by Pebble, but is repurposed here to record the intent to
recompact these files with split user keys. If ratcheting to
FormatSplitUserKeysMarked discovers files with split user keys, it marks them
for compaction and then rotates the manifest to ensure that the updated file
metadata is persisted durably.

This commit introduces a new rewrite compaction type. During compaction picking
if no other productive compaction is picked, the compaction picker looks for
files marked for compaction. It uses a manifest.Annotator to avoid a linear
search through the file metadata. If a marked file exists, it picks a
compaction that outputs into the file's existing level, pulling in its atomic
compaction unit if necessary.

The second format major version FormatMarkCompacted is used to guarantee that
no files that are marked for compaction exist. This may be used in a subequent
CockroachDB release (22.2) to ensure that all files marked for compaction by
the FormatSplitUserKeysMarked format major version have been compacted away.
Ratcheting to this format major version blocks until all the marked files are
compacted.

Together these format major versions will allow us to remove code necessary to
handle these atomic compaction units, when we increase the minimum format major
version supported by Pebble.

Close #1495.
  • Loading branch information
jbowens committed Mar 10, 2022
1 parent 6cebaf5 commit 93d4722
Show file tree
Hide file tree
Showing 30 changed files with 997 additions and 104 deletions.
13 changes: 8 additions & 5 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ const (
compactionKindDeleteOnly
compactionKindElisionOnly
compactionKindRead
compactionKindRewrite
)

func (k compactionKind) String() string {
Expand All @@ -301,6 +302,8 @@ func (k compactionKind) String() string {
return "elision-only"
case compactionKindRead:
return "read"
case compactionKindRewrite:
return "rewrite"
}
return "?"
}
Expand All @@ -320,9 +323,9 @@ type compaction struct {
// startLevel is the level that is being compacted. Inputs from startLevel
// and outputLevel will be merged to produce a set of outputLevel files.
startLevel *compactionLevel
// outputLevel is the level that files are being produced in. outputLevel is
// equal to startLevel+1 except when startLevel is 0 in which case it is
// equal to compactionPicker.baseLevel().
// outputLevel is the level that files are being produced in. For default
// compactions, outputLevel is equal to startLevel+1 except when startLevel
// is 0 in which case it is equal to compactionPicker.baseLevel().
outputLevel *compactionLevel

inputs []compactionLevel
Expand Down Expand Up @@ -1433,7 +1436,7 @@ func (d *DB) flush1() error {
}

d.mu.versions.logLock()
err = d.mu.versions.logAndApply(jobID, ve, c.metrics,
err = d.mu.versions.logAndApply(jobID, ve, c.metrics, false, /* forceRotation */
func() []compactionInfo { return d.getInProgressCompactionInfoLocked(c) })
if err != nil {
// TODO(peter): untested.
Expand Down Expand Up @@ -1857,7 +1860,7 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
info.Duration = d.timeNow().Sub(startTime)
if err == nil {
d.mu.versions.logLock()
err = d.mu.versions.logAndApply(jobID, ve, c.metrics, func() []compactionInfo {
err = d.mu.versions.logAndApply(jobID, ve, c.metrics, false /* forceRotation */, func() []compactionInfo {
return d.getInProgressCompactionInfoLocked(c)
})
if err != nil {
Expand Down
126 changes: 123 additions & 3 deletions compaction_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type compactionPicker interface {
pickAuto(env compactionEnv) (pc *pickedCompaction)
pickManual(env compactionEnv, manual *manualCompaction) (c *pickedCompaction, retryLater bool)
pickElisionOnlyCompaction(env compactionEnv) (pc *pickedCompaction)
pickRewriteCompaction(env compactionEnv) (pc *pickedCompaction)
pickReadTriggeredCompaction(env compactionEnv) (pc *pickedCompaction)
forceBaseLevel1()
}
Expand Down Expand Up @@ -109,9 +110,9 @@ type pickedCompaction struct {
// startLevel is the level that is being compacted. Inputs from startLevel
// and outputLevel will be merged to produce a set of outputLevel files.
startLevel *compactionLevel
// outputLevel is the level that files are being produced in. outputLevel is
// equal to startLevel+1 except when startLevel is 0 in which case it is
// equal to compactionPicker.baseLevel().
// outputLevel is the level that files are being produced in. In default
// compactions, outputLevel is equal to startLevel+1 except when startLevel
// is 0 in which case it is equal to compactionPicker.baseLevel().
outputLevel *compactionLevel
// adjustedOutputLevel is the output level used for the purpose of
// determining the target output file size, overlap bytes, and expanded
Expand Down Expand Up @@ -1066,6 +1067,25 @@ func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompact
*env.readCompactionEnv.rescheduleReadCompaction = true
}

// At the lowest possible compaction-picking priority, look for files marked
// for compaction. Pebble will mark files for compaction if they have atomic
// compaction units that span multiple files. While current Pebble code does
// not construct such sstables, RocksDB and earlier versions of Pebble may
// have created them. These split user keys form sets of files that must be
// compacted together for correctness (referred to as "atomic compaction
// units" within the code). Rewrite them in-place.
//
// It's also possible that a file may have been marked for compaction by
// even earlier versions of Pebble code, since FileMetadata's
// MarkedForCompaction field is persisted in the manifest. That's okay. We
// previously would've ignored the designation, whereas now we'll re-compact
// the file in place.
if p.vers.Stats.MarkedForCompaction > 0 {
if pc := p.pickRewriteCompaction(env); pc != nil {
return pc
}
}

return nil
}

Expand Down Expand Up @@ -1129,6 +1149,46 @@ func (a elisionOnlyAnnotator) Merge(v interface{}, accum interface{}) interface{
return accumV
}

// markedForCompactionAnnotator implements the manifest.Annotator interface,
// annotating B-Tree nodes with the *fileMetadata of a file that is marked for
// compaction within the subtree. If multiple files meet the criteria, it
// chooses whichever file has the lowest LargestSeqNum.
type markedForCompactionAnnotator struct{}

var _ manifest.Annotator = markedForCompactionAnnotator{}

func (a markedForCompactionAnnotator) Zero(interface{}) interface{} {
return nil
}

func (a markedForCompactionAnnotator) Accumulate(
f *fileMetadata, dst interface{},
) (interface{}, bool) {
if !f.MarkedForCompaction {
// Not marked for compaction; return dst.
return dst, true
}
return markedMergeHelper(f, dst)
}

func (a markedForCompactionAnnotator) Merge(v interface{}, accum interface{}) interface{} {
if v == nil {
return accum
}
accum, _ = markedMergeHelper(v.(*fileMetadata), accum)
return accum
}

// REQUIRES: f is non-nil, and f.MarkedForCompaction=true.
func markedMergeHelper(f *fileMetadata, dst interface{}) (interface{}, bool) {
if dst == nil {
return f, true
} else if dstV := dst.(*fileMetadata); dstV.LargestSeqNum > f.LargestSeqNum {
return f, true
}
return dst, true
}

// pickElisionOnlyCompaction looks for compactions of sstables in the
// bottommost level containing obsolete records that may now be dropped.
func (p *compactionPickerByScore) pickElisionOnlyCompaction(
Expand Down Expand Up @@ -1164,6 +1224,66 @@ func (p *compactionPickerByScore) pickElisionOnlyCompaction(
return nil
}

// pickRewriteCompaction attempts to construct a compaction that
// rewrites a file marked for compaction. pickRewriteCompaction will
// pull in adjacent files in the file's atomic compaction unit if
// necessary. A rewrite compaction outputs files to the same level as
// the input level.
func (p *compactionPickerByScore) pickRewriteCompaction(env compactionEnv) (pc *pickedCompaction) {
for l := numLevels - 1; l >= 0; l-- {
v := p.vers.Levels[l].Annotation(markedForCompactionAnnotator{})
if v == nil {
// Try the next level.
continue
}
candidate := v.(*fileMetadata)
if candidate.Compacting {
// Try the next level.
continue
}
lf := p.vers.Levels[l].Find(p.opts.Comparer.Compare, candidate)
if lf == nil {
panic(fmt.Sprintf("file %s not found in level %d as expected", candidate.FileNum, numLevels-1))
}

inputs := lf.Slice()
// L0 files generated by a flush have never been split such that
// adjacent files can contain the same user key. So we do not need to
// rewrite an atomic compaction unit for L0. Note that there is nothing
// preventing two different flushes from producing files that are
// non-overlapping from an InternalKey perspective, but span the same
// user key. However, such files cannot be in the same L0 sublevel,
// since each sublevel requires non-overlapping user keys (unlike other
// levels).
if l > 0 {
// Find this file's atomic compaction unit. This is only relevant
// for levels L1+.
var isCompacting bool
inputs, isCompacting = expandToAtomicUnit(
p.opts.Comparer.Compare,
inputs,
false, /* disableIsCompacting */
)
if isCompacting {
// Try the next level.
continue
}
}

pc = newPickedCompaction(p.opts, p.vers, l, l, p.baseLevel)
pc.outputLevel.level = l
pc.kind = compactionKindRewrite
pc.startLevel.files = inputs
pc.smallest, pc.largest = manifest.KeyRange(pc.cmp, pc.startLevel.files.Iter())

// Fail-safe to protect against compacting the same sstable concurrently.
if !inputRangeAlreadyCompacting(env, pc) {
return pc
}
}
return nil
}

// pickAutoLPositive picks an automatic compaction for the candidate
// file in a positive-numbered level. This function must not be used for
// L0.
Expand Down
16 changes: 16 additions & 0 deletions compaction_picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,22 @@ func TestCompactionPickerL0(t *testing.T) {
return "nil"
}
return result.String()
case "mark-for-compaction":
var fileNum uint64
td.ScanArgs(t, "file", &fileNum)
for l, lm := range picker.vers.Levels {
iter := lm.Iter()
for f := iter.First(); f != nil; f = iter.Next() {
if f.FileNum != base.FileNum(fileNum) {
continue
}
f.MarkedForCompaction = true
picker.vers.Stats.MarkedForCompaction++
picker.vers.Levels[l].InvalidateAnnotation(markedForCompactionAnnotator{})
return fmt.Sprintf("marked L%d.%s", l, f.FileNum)
}
}
return "not-found"
case "max-output-file-size":
if pc == nil {
return "no compaction"
Expand Down
107 changes: 107 additions & 0 deletions compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ func (p *compactionPickerForTesting) pickElisionOnlyCompaction(
return nil
}

func (p *compactionPickerForTesting) pickRewriteCompaction(
env compactionEnv,
) (pc *pickedCompaction) {
return nil
}

func (p *compactionPickerForTesting) pickManual(
env compactionEnv, manual *manualCompaction,
) (pc *pickedCompaction, retryLater bool) {
Expand Down Expand Up @@ -3348,3 +3354,104 @@ func Test_calculateInuseKeyRanges(t *testing.T) {
})
}
}

func TestMarkedForCompaction(t *testing.T) {
var mem vfs.FS = vfs.NewMem()
var d *DB
defer func() {
if d != nil {
require.NoError(t, d.Close())
}
}()

var buf bytes.Buffer
opts := &Options{
FS: mem,
DebugCheck: DebugCheckLevels,
DisableAutomaticCompactions: true,
FormatMajorVersion: FormatNewest,
EventListener: EventListener{
CompactionEnd: func(info CompactionInfo) {
info.JobID = 100 // Fix to avoid nondeterminism.
fmt.Fprintln(&buf, info)
},
},
}

reset := func() {
if d != nil {
require.NoError(t, d.Close())
}
mem = vfs.NewMem()
require.NoError(t, mem.MkdirAll("ext", 0755))

var err error
d, err = Open("", opts)
require.NoError(t, err)
}
datadriven.RunTest(t, "testdata/marked_for_compaction", func(td *datadriven.TestData) string {
switch td.Cmd {
case "reset":
reset()
return ""

case "define":
if d != nil {
if err := d.Close(); err != nil {
return err.Error()
}
}
var err error
if d, err = runDBDefineCmd(td, opts); err != nil {
return err.Error()
}
d.mu.Lock()
defer d.mu.Unlock()
t := time.Now()
d.timeNow = func() time.Time {
t = t.Add(time.Second)
return t
}
s := d.mu.versions.currentVersion().DebugString(base.DefaultFormatter)
return s

case "mark-for-compaction":
d.mu.Lock()
defer d.mu.Unlock()
vers := d.mu.versions.currentVersion()
var fileNum uint64
td.ScanArgs(t, "file", &fileNum)
for l, lm := range vers.Levels {
iter := lm.Iter()
for f := iter.First(); f != nil; f = iter.Next() {
if f.FileNum != base.FileNum(fileNum) {
continue
}
f.MarkedForCompaction = true
vers.Stats.MarkedForCompaction++
vers.Levels[l].InvalidateAnnotation(markedForCompactionAnnotator{})
return fmt.Sprintf("marked L%d.%s", l, f.FileNum)
}
}
return "not-found"

case "maybe-compact":
d.mu.Lock()
defer d.mu.Unlock()
d.opts.DisableAutomaticCompactions = false
d.maybeScheduleCompaction()
for d.mu.compact.compactingCount > 0 {
d.mu.compact.cond.Wait()
}

fmt.Fprintln(&buf, d.mu.versions.currentVersion().DebugString(base.DefaultFormatter))
s := strings.TrimSpace(buf.String())
buf.Reset()
opts.DisableAutomaticCompactions = true
return s

default:
return fmt.Sprintf("unknown command: %s", td.Cmd)
}
})
}
30 changes: 29 additions & 1 deletion data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ func runDBDefineCmd(td *datadriven.TestData, opts *Options) (*DB, error) {
jobID := d.mu.nextJobID
d.mu.nextJobID++
d.mu.versions.logLock()
if err := d.mu.versions.logAndApply(jobID, ve, newFileMetrics(ve.NewFiles), func() []compactionInfo {
if err := d.mu.versions.logAndApply(jobID, ve, newFileMetrics(ve.NewFiles), false, func() []compactionInfo {
return nil
}); err != nil {
return nil, err
Expand Down Expand Up @@ -822,6 +822,34 @@ func runIngestCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error {
return nil
}

func runForceIngestCmd(td *datadriven.TestData, d *DB) error {
var paths []string
var level int
for _, arg := range td.CmdArgs {
switch arg.Key {
case "paths":
paths = append(paths, arg.Vals...)
case "level":
var err error
level, err = strconv.Atoi(arg.Vals[0])
if err != nil {
return err
}
}
}
return d.ingest(paths, func(
tableNewIters,
IterOptions,
Compare,
*version,
int,
map[*compaction]struct{},
*fileMetadata,
) (int, error) {
return level, nil
})
}

func runLSMCmd(td *datadriven.TestData, d *DB) string {
d.mu.Lock()
s := d.mu.versions.currentVersion().String()
Expand Down
Loading

0 comments on commit 93d4722

Please sign in to comment.