Skip to content

Commit

Permalink
db: add migration to rewrite atomic compaction units
Browse files Browse the repository at this point in the history
Add two new format major versions FormatSplitUserKeysMarked and
FormatMarkCompacted that together may be used to guarantee that all sstables
within the table form their own atomic compaction unit. Previous versions of
Pebble (before #1470) and RocksDB allowed keys with identical user keys to be
split across multiple tables within a level. This produced a set of files known
as an 'atomic compaction unit' that must be compacted together to preserve the
LSM invariant.

The first format major version FormatSplitUserKeysMarked may be used to
guarantee that every file that is member to a multi-file atomic compaction unit
is marked for compaction. The 'marked-for-compaction' file metadata field is
currently unused by Pebble, but is repurposed here to record the intent to
recompact these files with split user keys. If ratcheting to
FormatSplitUserKeysMarked discovers files with split user keys, it marks them
for compaction and then rotates the manifest to ensure that the updated file
metadata is persisted durably.

This commit introduces a new rewrite compaction type. During compaction picking
if no other productive compaction is picked, the compaction picker looks for
files marked for compaction. It uses a manifest.Annotator to avoid a linear
search through the file metadata. If a marked file exists, it picks a
compaction that outputs into the file's existing level, pulling in its atomic
compaction unit if necessary.

The second format major version FormatMarkCompacted is used to guarantee that
no files that are marked for compaction exist. This may be used in a subequent
CockroachDB release (22.2) to ensure that all files marked for compaction by
the FormatSplitUserKeysMarked format major version have been compacted away.
Ratcheting to this format major version blocks until all the marked files are
compacted.

Together these format major versions will allow us to remove code necessary to
handle these atomic compaction units, when we increase the minimum format major
version supported by Pebble.

Close #1495.
  • Loading branch information
jbowens committed Mar 3, 2022
1 parent 9d0c391 commit 59caac4
Show file tree
Hide file tree
Showing 27 changed files with 775 additions and 95 deletions.
13 changes: 8 additions & 5 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ const (
compactionKindDeleteOnly
compactionKindElisionOnly
compactionKindRead
compactionKindRewrite
)

func (k compactionKind) String() string {
Expand All @@ -301,6 +302,8 @@ func (k compactionKind) String() string {
return "elision-only"
case compactionKindRead:
return "read"
case compactionKindRewrite:
return "rewrite"
}
return "?"
}
Expand All @@ -320,9 +323,9 @@ type compaction struct {
// startLevel is the level that is being compacted. Inputs from startLevel
// and outputLevel will be merged to produce a set of outputLevel files.
startLevel *compactionLevel
// outputLevel is the level that files are being produced in. outputLevel is
// equal to startLevel+1 except when startLevel is 0 in which case it is
// equal to compactionPicker.baseLevel().
// outputLevel is the level that files are being produced in. For default
// compactions, outputLevel is equal to startLevel+1 except when startLevel
// is 0 in which case it is equal to compactionPicker.baseLevel().
outputLevel *compactionLevel

inputs []compactionLevel
Expand Down Expand Up @@ -1433,7 +1436,7 @@ func (d *DB) flush1() error {
}

d.mu.versions.logLock()
err = d.mu.versions.logAndApply(jobID, ve, c.metrics,
err = d.mu.versions.logAndApply(jobID, ve, c.metrics, false, /* forceRotation */
func() []compactionInfo { return d.getInProgressCompactionInfoLocked(c) })
if err != nil {
// TODO(peter): untested.
Expand Down Expand Up @@ -1857,7 +1860,7 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
info.Duration = d.timeNow().Sub(startTime)
if err == nil {
d.mu.versions.logLock()
err = d.mu.versions.logAndApply(jobID, ve, c.metrics, func() []compactionInfo {
err = d.mu.versions.logAndApply(jobID, ve, c.metrics, false /* forceRotation */, func() []compactionInfo {
return d.getInProgressCompactionInfoLocked(c)
})
if err != nil {
Expand Down
122 changes: 119 additions & 3 deletions compaction_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type compactionPicker interface {
pickAuto(env compactionEnv) (pc *pickedCompaction)
pickManual(env compactionEnv, manual *manualCompaction) (c *pickedCompaction, retryLater bool)
pickElisionOnlyCompaction(env compactionEnv) (pc *pickedCompaction)
pickRewriteCompaction(env compactionEnv) (pc *pickedCompaction)
pickReadTriggeredCompaction(env compactionEnv) (pc *pickedCompaction)
forceBaseLevel1()
}
Expand Down Expand Up @@ -109,9 +110,9 @@ type pickedCompaction struct {
// startLevel is the level that is being compacted. Inputs from startLevel
// and outputLevel will be merged to produce a set of outputLevel files.
startLevel *compactionLevel
// outputLevel is the level that files are being produced in. outputLevel is
// equal to startLevel+1 except when startLevel is 0 in which case it is
// equal to compactionPicker.baseLevel().
// outputLevel is the level that files are being produced in. In default
// compactions, outputLevel is equal to startLevel+1 except when startLevel
// is 0 in which case it is equal to compactionPicker.baseLevel().
outputLevel *compactionLevel
// adjustedOutputLevel is the output level used for the purpose of
// determining the target output file size, overlap bytes, and expanded
Expand Down Expand Up @@ -1066,6 +1067,25 @@ func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompact
*env.readCompactionEnv.rescheduleReadCompaction = true
}

// At the lowest possible compaction-picking priority, look for files marked
// for compaction. Pebble will mark files for compaction if they have atomic
// compaction units that span multiple files. While current Pebble code does
// not construct such sstables, RocksDB and earlier versions of Pebble may
// have created them. These split user keys form sets of files that must be
// compacted together for correctness (referred to as "atomic compaction
// units" within the code). Rewrite them in-place.
//
// It's also possible that a file may have been marked for compaction by
// even earlier versions of Pebble code, since FileMetadata's
// MarkedForCompaction field is persisted in the manifest. That's okay. We
// previously would've ignored the designation, whereas now we'll re-compact
// the file in place.
if p.vers.Stats.MarkedForCompaction > 0 {
if pc := p.pickRewriteCompaction(env); pc != nil {
return pc
}
}

return nil
}

Expand Down Expand Up @@ -1129,6 +1149,50 @@ func (a elisionOnlyAnnotator) Merge(v interface{}, accum interface{}) interface{
return accumV
}

// markedForCompactionAnnotator implements the manifest.Annotator interface,
// annotating B-Tree nodes with the *fileMetadata of a file that is marked for
// compaction within the subtree. If multiple files meet the criteria, it
// chooses whichever file has the lowest LargestSeqNum.
type markedForCompactionAnnotator struct{}

var _ manifest.Annotator = markedForCompactionAnnotator{}

func (a markedForCompactionAnnotator) Zero(interface{}) interface{} {
return nil
}

func (a markedForCompactionAnnotator) Accumulate(
f *fileMetadata, dst interface{},
) (interface{}, bool) {
if !f.MarkedForCompaction {
// Not marked for compaction; return dst.
return dst, true
}
if dst == nil {
return f, true
} else if dstV := dst.(*fileMetadata); dstV.LargestSeqNum > f.LargestSeqNum {
return f, true
}
return dst, true
}

func (a markedForCompactionAnnotator) Merge(v interface{}, accum interface{}) interface{} {
if v == nil {
return accum
}
// If we haven't accumulated an eligible file yet, or f's LargestSeqNum is
// less than the accumulated file's, use f.
if accum == nil {
return v
}
f := v.(*fileMetadata)
accumV := accum.(*fileMetadata)
if accumV == nil || accumV.LargestSeqNum > f.LargestSeqNum {
return f
}
return accumV
}

// pickElisionOnlyCompaction looks for compactions of sstables in the
// bottommost level containing obsolete records that may now be dropped.
func (p *compactionPickerByScore) pickElisionOnlyCompaction(
Expand Down Expand Up @@ -1164,6 +1228,58 @@ func (p *compactionPickerByScore) pickElisionOnlyCompaction(
return nil
}

// pickRewriteCompaction attempts to construct a compaction that
// rewrites a file marked for compaction. pickRewriteCompaction will
// pull in adjacent files in the file's atomic compaction unit if
// necessary. A rewrite compaction outputs files to the same level as
// the input level.
func (p *compactionPickerByScore) pickRewriteCompaction(env compactionEnv) (pc *pickedCompaction) {
for l := numLevels - 1; l >= 0; l-- {
v := p.vers.Levels[l].Annotation(markedForCompactionAnnotator{})
if v == nil {
// Try the next level.
continue
}
candidate := v.(*fileMetadata)
if candidate.Compacting {
// Try the next level.
continue
}
lf := p.vers.Levels[l].Find(p.opts.Comparer.Compare, candidate)
if lf == nil {
panic(fmt.Sprintf("file %s not found in level %d as expected", candidate.FileNum, numLevels-1))
}

inputs := lf.Slice()
if l > 0 {
// Find this file's atomic compaction unit. This is only relevant
// for levels L1+.
var isCompacting bool
inputs, isCompacting = expandToAtomicUnit(
p.opts.Comparer.Compare,
inputs,
false, /* disableIsCompacting */
)
if isCompacting {
// Try the next level.
continue
}
}

pc = newPickedCompaction(p.opts, p.vers, l, l, p.baseLevel)
pc.outputLevel.level = l
pc.kind = compactionKindRewrite
pc.startLevel.files = inputs
pc.smallest, pc.largest = manifest.KeyRange(pc.cmp, pc.startLevel.files.Iter())

// Fail-safe to protect against compacting the same sstable concurrently.
if !inputRangeAlreadyCompacting(env, pc) {
return pc
}
}
return nil
}

// pickAutoLPositive picks an automatic compaction for the candidate
// file in a positive-numbered level. This function must not be used for
// L0.
Expand Down
6 changes: 6 additions & 0 deletions compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ func (p *compactionPickerForTesting) pickElisionOnlyCompaction(
return nil
}

func (p *compactionPickerForTesting) pickRewriteCompaction(
env compactionEnv,
) (pc *pickedCompaction) {
return nil
}

func (p *compactionPickerForTesting) pickManual(
env compactionEnv, manual *manualCompaction,
) (pc *pickedCompaction, retryLater bool) {
Expand Down
30 changes: 29 additions & 1 deletion data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ func runDBDefineCmd(td *datadriven.TestData, opts *Options) (*DB, error) {
jobID := d.mu.nextJobID
d.mu.nextJobID++
d.mu.versions.logLock()
if err := d.mu.versions.logAndApply(jobID, ve, newFileMetrics(ve.NewFiles), func() []compactionInfo {
if err := d.mu.versions.logAndApply(jobID, ve, newFileMetrics(ve.NewFiles), false, func() []compactionInfo {
return nil
}); err != nil {
return nil, err
Expand Down Expand Up @@ -822,6 +822,34 @@ func runIngestCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error {
return nil
}

func runForceIngestCmd(td *datadriven.TestData, d *DB) error {
var paths []string
var level int
for _, arg := range td.CmdArgs {
switch arg.Key {
case "paths":
paths = append(paths, arg.Vals...)
case "level":
var err error
level, err = strconv.Atoi(arg.Vals[0])
if err != nil {
return err
}
}
}
return d.ingest(paths, func(
tableNewIters,
IterOptions,
Compare,
*version,
int,
map[*compaction]struct{},
*fileMetadata,
) (int, error) {
return level, nil
})
}

func runLSMCmd(td *datadriven.TestData, d *DB) string {
d.mu.Lock()
s := d.mu.versions.currentVersion().DebugString(base.DefaultFormatter)
Expand Down
1 change: 1 addition & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1394,6 +1394,7 @@ func (d *DB) Metrics() *Metrics {
metrics.Compact.EstimatedDebt = d.mu.versions.picker.estimatedCompactionDebt(0)
metrics.Compact.InProgressBytes = atomic.LoadInt64(&d.mu.versions.atomic.atomicInProgressBytes)
metrics.Compact.NumInProgress = int64(d.mu.compact.compactingCount)
metrics.Compact.MarkedFiles = d.mu.versions.currentVersion().Stats.MarkedForCompaction
for _, m := range d.mu.mem.queue {
metrics.MemTable.Size += m.totalBytes()
}
Expand Down
2 changes: 1 addition & 1 deletion flush_external.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func flushExternalTable(untypedDB interface{}, path string, originalMeta *fileMe
TablesIngested: 1,
},
}
err := d.mu.versions.logAndApply(jobID, ve, metrics, func() []compactionInfo {
err := d.mu.versions.logAndApply(jobID, ve, metrics, false /* forceRotation */, func() []compactionInfo {
return d.getInProgressCompactionInfoLocked(nil)
})
if err != nil {
Expand Down
Loading

0 comments on commit 59caac4

Please sign in to comment.