diff --git a/compaction.go b/compaction.go index 44dac8f2db..8d7bb061f0 100644 --- a/compaction.go +++ b/compaction.go @@ -285,6 +285,7 @@ const ( compactionKindDeleteOnly compactionKindElisionOnly compactionKindRead + compactionKindRewrite ) func (k compactionKind) String() string { @@ -301,6 +302,8 @@ func (k compactionKind) String() string { return "elision-only" case compactionKindRead: return "read" + case compactionKindRewrite: + return "rewrite" } return "?" } @@ -320,9 +323,9 @@ type compaction struct { // startLevel is the level that is being compacted. Inputs from startLevel // and outputLevel will be merged to produce a set of outputLevel files. startLevel *compactionLevel - // outputLevel is the level that files are being produced in. outputLevel is - // equal to startLevel+1 except when startLevel is 0 in which case it is - // equal to compactionPicker.baseLevel(). + // outputLevel is the level that files are being produced in. For default + // compactions, outputLevel is equal to startLevel+1 except when startLevel + // is 0 in which case it is equal to compactionPicker.baseLevel(). outputLevel *compactionLevel inputs []compactionLevel @@ -448,15 +451,9 @@ func newCompaction(pc *pickedCompaction, opts *Options, bytesCompacted *uint64) } c.setupInuseKeyRanges() - switch { - case pc.readTriggered: - c.kind = compactionKindRead - case c.startLevel.level == numLevels-1: - // This compaction is an L6->L6 elision-only compaction to rewrite - // a sstable without unnecessary tombstones. - c.kind = compactionKindElisionOnly - case c.outputLevel.files.Empty() && c.startLevel.files.Len() == 1 && - c.grandparents.SizeSum() <= c.maxOverlapBytes: + c.kind = pc.kind + if c.kind == compactionKindDefault && c.outputLevel.files.Empty() && + c.startLevel.files.Len() == 1 && c.grandparents.SizeSum() <= c.maxOverlapBytes { // This compaction can be converted into a trivial move from one level // to the next. We avoid such a move if there is lots of overlapping // grandparent data. Otherwise, the move could create a parent file @@ -1548,6 +1545,7 @@ func (d *DB) maybeScheduleCompactionPicker( } env := compactionEnv{ + nonatomicFileCount: &d.mu.compact.nonatomicFileCount, bytesCompacted: &d.atomic.bytesCompacted, earliestSnapshotSeqNum: d.mu.snapshots.earliest(), earliestUnflushedSeqNum: d.getEarliestUnflushedSeqNumLocked(), diff --git a/compaction_picker.go b/compaction_picker.go index a7eb5b48c0..c293a99c3b 100644 --- a/compaction_picker.go +++ b/compaction_picker.go @@ -27,6 +27,13 @@ type compactionEnv struct { earliestSnapshotSeqNum uint64 inProgressCompactions []compactionInfo readCompactionEnv readCompactionEnv + + // nonatomicFileCount is protected by DB.mu. It holds the most-recently + // calculated count of files that are not their own atomic compaction unit, + // as calculated by findSplitUserKey. It's a pointer so that the compaction + // picker may update the count when it performs a scan of the LSM looking + // for such a file. + nonatomicFileCount *int } type compactionPicker interface { @@ -37,6 +44,7 @@ type compactionPicker interface { pickAuto(env compactionEnv) (pc *pickedCompaction) pickManual(env compactionEnv, manual *manualCompaction) (c *pickedCompaction, retryLater bool) pickElisionOnlyCompaction(env compactionEnv) (pc *pickedCompaction) + pickRewriteCompaction(env compactionEnv, level int, file manifest.LevelFile) (pc *pickedCompaction) pickReadTriggeredCompaction(env compactionEnv) (pc *pickedCompaction) forceBaseLevel1() } @@ -103,15 +111,15 @@ type pickedCompaction struct { // score of the chosen compaction. Taken from candidateLevelInfo. score float64 - // readTrigger is true if the compaction was triggered due to reads. - readTriggered bool + // kind indicates the kind of compaction. + kind compactionKind // startLevel is the level that is being compacted. Inputs from startLevel // and outputLevel will be merged to produce a set of outputLevel files. startLevel *compactionLevel - // outputLevel is the level that files are being produced in. outputLevel is - // equal to startLevel+1 except when startLevel is 0 in which case it is - // equal to compactionPicker.baseLevel(). + // outputLevel is the level that files are being produced in. In default + // compactions, outputLevel is equal to startLevel+1 except when startLevel + // is 0 in which case it is equal to compactionPicker.baseLevel(). outputLevel *compactionLevel // adjustedOutputLevel is the output level used for the purpose of // determining the target output file size, overlap bytes, and expanded @@ -142,12 +150,7 @@ type pickedCompaction struct { version *version } -func newPickedCompaction(opts *Options, cur *version, startLevel, baseLevel int) *pickedCompaction { - if startLevel > 0 && startLevel < baseLevel { - panic(fmt.Sprintf("invalid compaction: start level %d should not be empty (base level %d)", - startLevel, baseLevel)) - } - +func defaultOutputLevel(startLevel, baseLevel int) int { outputLevel := startLevel + 1 if startLevel == 0 { outputLevel = baseLevel @@ -155,6 +158,15 @@ func newPickedCompaction(opts *Options, cur *version, startLevel, baseLevel int) if outputLevel >= numLevels-1 { outputLevel = numLevels - 1 } + return outputLevel +} + +func newPickedCompaction(opts *Options, cur *version, startLevel, outputLevel, baseLevel int) *pickedCompaction { + if startLevel > 0 && startLevel < baseLevel { + panic(fmt.Sprintf("invalid compaction: start level %d should not be empty (base level %d)", + startLevel, baseLevel)) + } + // Output level is in the range [baseLevel,numLevels]. For the purpose of // determining the target output file size, overlap bytes, and expanded // bytes, we want to adjust the range to [1,numLevels]. @@ -177,7 +189,7 @@ func newPickedCompaction(opts *Options, cur *version, startLevel, baseLevel int) func newPickedCompactionFromL0( lcf *manifest.L0CompactionFiles, opts *Options, vers *version, baseLevel int, isBase bool, ) *pickedCompaction { - pc := newPickedCompaction(opts, vers, 0, baseLevel) + pc := newPickedCompaction(opts, vers, 0, baseLevel, baseLevel) pc.lcf = lcf if !isBase { pc.outputLevel.level = 0 @@ -1053,6 +1065,29 @@ func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompact *env.readCompactionEnv.rescheduleReadCompaction = true } + // At the lowest possible compaction-picking priority, look for atomic + // compaction units that span multiple files. While current Pebble code does + // not construct such sstables, RocksDB and earlier versions of Pebble may + // have created them. These split user keys form sets of files that must be + // compacted together for correctness (referred to as "atomic compaction + // units" within the code). Rewrite them in-place. + if *env.nonatomicFileCount > 0 { + count, level, file, ok := findSplitUserKey(p.opts, p.vers) + // Save the newly established count so that it may be reported db + // DB.Metrics, and so a subsequent pickAuto doesn't need to scan the LSM + // if there no longer are any such files. NB: It's okay to mutate + // nonatomicFileCount because compaction-picking is performed while + // holding DB.mu. + *env.nonatomicFileCount = count + if !ok { + // There are no multi-file atomic compaction units in the database. + return nil + } + if pc := p.pickRewriteCompaction(env, level, file); pc != nil { + return pc + } + } + return nil } @@ -1136,13 +1171,43 @@ func (p *compactionPickerByScore) pickElisionOnlyCompaction( // Construct a picked compaction of the elision candidate's atomic // compaction unit. - pc = newPickedCompaction(p.opts, p.vers, numLevels-1, p.baseLevel) + pc = newPickedCompaction(p.opts, p.vers, numLevels-1, numLevels-1, p.baseLevel) var isCompacting bool pc.startLevel.files, isCompacting = expandToAtomicUnit(p.opts.Comparer.Compare, lf.Slice(), false /* disableIsCompacting */) if isCompacting { return nil } + pc.kind = compactionKindElisionOnly + pc.smallest, pc.largest = manifest.KeyRange(pc.cmp, pc.startLevel.files.Iter()) + // Fail-safe to protect against compacting the same sstable concurrently. + if !inputRangeAlreadyCompacting(env, pc) { + return pc + } + return nil +} + +// pickRewriteCompaction attempts to construct a compaction that rewrites the +// provided file in the provided level. pickRewriteCompaction will pull in +// adjacent files in the file's atomic compaction unit if necessary. A rewrite +// compaction outputs files to the same level as the input level. +func (p *compactionPickerByScore) pickRewriteCompaction(env compactionEnv, level int, file manifest.LevelFile) (pc *pickedCompaction) { + // Find this file's atomic compaction unit. + atomicUnit, isCompacting := expandToAtomicUnit( + p.opts.Comparer.Compare, + file.Slice(), + false, /* disableIsCompacting */ + ) + if isCompacting { + return nil + } + + pc = newPickedCompaction(p.opts, p.vers, level, level, p.baseLevel) + pc.outputLevel.level = level + pc.adjustedOutputLevel = 1 + level - p.baseLevel + pc.kind = compactionKindRewrite + pc.startLevel.files = atomicUnit pc.smallest, pc.largest = manifest.KeyRange(pc.cmp, pc.startLevel.files.Iter()) + // Fail-safe to protect against compacting the same sstable concurrently. if !inputRangeAlreadyCompacting(env, pc) { return pc @@ -1162,7 +1227,8 @@ func pickAutoHelper( return pickIntraL0(env, opts, vers) } - pc = newPickedCompaction(opts, vers, cInfo.level, baseLevel) + outputLevel := defaultOutputLevel(cInfo.level, baseLevel) + pc = newPickedCompaction(opts, vers, cInfo.level, outputLevel, baseLevel) if pc.outputLevel.level != cInfo.outputLevel { panic("pebble: compaction picked unexpected output level") } @@ -1325,8 +1391,7 @@ func pickIntraL0(env compactionEnv, opts *Options, vers *version) (pc *pickedCom if compactTotalCount < minIntraL0Count { return nil } - - pc = newPickedCompaction(opts, vers, 0, 0) + pc = newPickedCompaction(opts, vers, 0, 0, 0) pc.startLevel.files = compactFiles pc.smallest, pc.largest = manifest.KeyRange(pc.cmp, compactFiles.Iter()) // Output only a single sstable for intra-L0 compactions. There is no current @@ -1383,7 +1448,8 @@ func pickManualHelper( baseLevel int, diskAvailBytes func() uint64, ) (pc *pickedCompaction) { - pc = newPickedCompaction(opts, vers, manual.level, baseLevel) + outputLevel := defaultOutputLevel(manual.level, baseLevel) + pc = newPickedCompaction(opts, vers, manual.level, outputLevel, baseLevel) manual.outputLevel = pc.outputLevel.level cmp := opts.Comparer.Compare pc.startLevel.files = vers.Overlaps(manual.level, cmp, manual.start.UserKey, @@ -1439,7 +1505,8 @@ func pickReadTriggeredCompactionHelper( return nil } - pc = newPickedCompaction(p.opts, p.vers, rc.level, p.baseLevel) + outputLevel := defaultOutputLevel(rc.level, p.baseLevel) + pc = newPickedCompaction(p.opts, p.vers, rc.level, outputLevel, p.baseLevel) pc.startLevel.files = overlapSlice if !pc.setupInputs(p.opts, p.diskAvailBytes()) { @@ -1448,7 +1515,7 @@ func pickReadTriggeredCompactionHelper( if inputRangeAlreadyCompacting(env, pc) { return nil } - pc.readTriggered = true + pc.kind = compactionKindRead // Prevent read compactions which are too wide. outputOverlaps := pc.version.Overlaps( @@ -1544,3 +1611,32 @@ func conflictsWithInProgress( } return false } + +// findSplitUserKey scans the LSM's levels 1 through 6 for adjacent files that +// contain the same user key. Such arrangements of files were permitted in +// RocksDB and in Pebble up to SHA a860bbad. findSplitUserKey returns the count +// of such files (may double-count an atomic compaction unit) and it returns the +// level and LevelFile for one such file. +func findSplitUserKey(opts *Options, vers *version) (count, level int, file manifest.LevelFile, ok bool) { + equal := opts.equal() + for l := numLevels - 1; l > 0; l-- { + iter := vers.Levels[l].Iter() + + var userKey []byte + for f := iter.First(); f != nil; f = iter.Next() { + if userKey != nil && equal(userKey, f.Smallest.UserKey) { + if !ok { + // First eligible file found. + level, file, ok = l, iter.Take(), true + } + count++ + } + if f.Largest.IsExclusiveSentinel() { + userKey = nil + } else { + userKey = f.Largest.UserKey + } + } + } + return count, level, file, ok +} diff --git a/compaction_picker_test.go b/compaction_picker_test.go index 5500ab29dd..aced388fb0 100644 --- a/compaction_picker_test.go +++ b/compaction_picker_test.go @@ -191,6 +191,7 @@ func TestCompactionPickerTargetLevel(t *testing.T) { var inProgress []compactionInfo for { env := compactionEnv{ + nonatomicFileCount: new(int), earliestUnflushedSeqNum: InternalKeySeqNumMax, inProgressCompactions: inProgress, } @@ -268,6 +269,7 @@ func TestCompactionPickerTargetLevel(t *testing.T) { } pc := pickerByScore.pickAuto(compactionEnv{ + nonatomicFileCount: new(int), earliestUnflushedSeqNum: InternalKeySeqNumMax, inProgressCompactions: inProgress, }) @@ -610,6 +612,7 @@ func TestCompactionPickerL0(t *testing.T) { } pc = picker.pickAuto(compactionEnv{ + nonatomicFileCount: new(int), bytesCompacted: new(uint64), earliestUnflushedSeqNum: math.MaxUint64, inProgressCompactions: inProgressCompactions, @@ -833,6 +836,7 @@ func TestCompactionPickerConcurrency(t *testing.T) { } pc := picker.pickAuto(compactionEnv{ + nonatomicFileCount: new(int), bytesCompacted: new(uint64), earliestUnflushedSeqNum: math.MaxUint64, inProgressCompactions: inProgressCompactions, @@ -982,6 +986,7 @@ func TestCompactionPickerPickReadTriggered(t *testing.T) { case "pick-auto": pc := picker.pickAuto(compactionEnv{ + nonatomicFileCount: new(int), bytesCompacted: new(uint64), earliestUnflushedSeqNum: math.MaxUint64, readCompactionEnv: readCompactionEnv{ @@ -1293,6 +1298,7 @@ func TestCompactionOutputFileSize(t *testing.T) { case "pick-auto": pc := picker.pickAuto(compactionEnv{ + nonatomicFileCount: new(int), bytesCompacted: new(uint64), earliestUnflushedSeqNum: math.MaxUint64, earliestSnapshotSeqNum: math.MaxUint64, diff --git a/compaction_test.go b/compaction_test.go index 15c69c1623..55b08ad287 100644 --- a/compaction_test.go +++ b/compaction_test.go @@ -92,6 +92,12 @@ func (p *compactionPickerForTesting) pickElisionOnlyCompaction( return nil } +func (p *compactionPickerForTesting) pickRewriteCompaction( + env compactionEnv, level int, file manifest.LevelFile, +) (pc *pickedCompaction) { + return nil +} + func (p *compactionPickerForTesting) pickManual( env compactionEnv, manual *manualCompaction, ) (pc *pickedCompaction, retryLater bool) { @@ -1566,7 +1572,7 @@ func TestCompactionOutputLevel(t *testing.T) { var start, base int d.ScanArgs(t, "start", &start) d.ScanArgs(t, "base", &base) - pc := newPickedCompaction(opts, version, start, base) + pc := newPickedCompaction(opts, version, start, defaultOutputLevel(start, base), base) c := newCompaction(pc, opts, new(uint64)) return fmt.Sprintf("output=%d\nmax-output-file-size=%d\n", c.outputLevel.level, c.maxOutputFileSize) diff --git a/data_test.go b/data_test.go index eeb6f2c4d0..355317adc0 100644 --- a/data_test.go +++ b/data_test.go @@ -812,6 +812,34 @@ func runIngestCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error { return nil } +func runForceIngestCmd(td *datadriven.TestData, d *DB) error { + var paths []string + var level int + for _, arg := range td.CmdArgs { + switch arg.Key { + case "paths": + paths = append(paths, arg.Vals...) + case "level": + var err error + level, err = strconv.Atoi(arg.Vals[0]) + if err != nil { + return err + } + } + } + return d.ingest(paths, func( + tableNewIters, + IterOptions, + Compare, + *version, + int, + map[*compaction]struct{}, + *fileMetadata, + ) (int, error) { + return level, nil + }) +} + func runLSMCmd(td *datadriven.TestData, d *DB) string { d.mu.Lock() s := d.mu.versions.currentVersion().DebugString(base.DefaultFormatter) diff --git a/db.go b/db.go index f536fe1e8c..e32bea044f 100644 --- a/db.go +++ b/db.go @@ -397,6 +397,16 @@ type DB struct { // readCompactions is a readCompactionQueue which keeps track of the // compactions which we might have to perform. readCompactions readCompactionQueue + + // nonatomicFileCount records the current count of files that are + // not their own atomic compaction unit: That is the count of files + // that share a user key with a neighboring file in levels L1-L6. + // Such files are compacted in the background in a rewrite + // compaction if there are no other available compactions. This stat + // is updated by the compaction picker when it looks for one of + // these compactions, by the FormatSplitUserKeys format major + // version migration and by Open. + nonatomicFileCount int } cleaner struct { @@ -1346,6 +1356,7 @@ func (d *DB) Metrics() *Metrics { metrics.Compact.EstimatedDebt = d.mu.versions.picker.estimatedCompactionDebt(0) metrics.Compact.InProgressBytes = atomic.LoadInt64(&d.mu.versions.atomic.atomicInProgressBytes) metrics.Compact.NumInProgress = int64(d.mu.compact.compactingCount) + metrics.Compact.NonatomicFiles = d.mu.compact.nonatomicFileCount for _, m := range d.mu.mem.queue { metrics.MemTable.Size += m.totalBytes() } diff --git a/format_major_version.go b/format_major_version.go index 517074382d..9dfd0dbc28 100644 --- a/format_major_version.go +++ b/format_major_version.go @@ -70,6 +70,11 @@ const ( // FormatBlockPropertyCollector is a format major version that introduces // BlockPropertyCollectors. FormatBlockPropertyCollector + // FormatSplitUserKeys is a format major version that guarantees that + // versions of a single user key is not split across multiple files within a + // level. Ratcheting to the FormatSplitUserKeys version will block until all + // necessary compactions are complete. + FormatSplitUserKeys // FormatRangeKeys is a format major version that introduces range keys. FormatRangeKeys // FormatNewest always contains the most recent format major version. @@ -86,7 +91,7 @@ func (v FormatMajorVersion) MaxTableFormat() sstable.TableFormat { case FormatDefault, FormatMostCompatible, formatVersionedManifestMarker, FormatVersioned, FormatSetWithDelete: return sstable.TableFormatRocksDBv2 - case FormatBlockPropertyCollector: + case FormatBlockPropertyCollector, FormatSplitUserKeys: return sstable.TableFormatPebblev1 case FormatRangeKeys: return sstable.TableFormatPebblev2 @@ -171,6 +176,14 @@ var formatMajorVersionMigrations = map[FormatMajorVersion]func(*DB) error{ FormatBlockPropertyCollector: func(d *DB) error { return d.finalizeFormatVersUpgrade(FormatBlockPropertyCollector) }, + FormatSplitUserKeys: func(d *DB) error { + // Before finalizing the format major version, rewrite any sstables that + // form multi-file atomic compaction units. + if err := d.rewriteSplitUserKeysLocked(); err != nil { + return err + } + return d.finalizeFormatVersUpgrade(FormatSplitUserKeys) + }, FormatRangeKeys: func(d *DB) error { return d.finalizeFormatVersUpgrade(FormatRangeKeys) }, @@ -274,3 +287,53 @@ func (d *DB) finalizeFormatVersUpgrade(formatVers FormatMajorVersion) error { d.opts.EventListener.FormatUpgrade(formatVers) return nil } + +// rewriteSplitUserKeysLocked performs a migration that rewrites adjacent +// sstables containing the same user key within levels 1-6. While current Pebble +// code does not construct such sstables, RocksDB and earlier versions of Pebble +// may have created them. These split user keys form sets of files that must be +// compacted together for correctness (referred to as "atomic compaction units" +// within the code). +// +// This migration will allow future versions of Pebble to simplify code and +// remove the concept of 'atomic compaction units'. +// +// rewriteSplitUserKeysLocked is run while ratcheting the database's format +// major version to FormatSplitUserKeys. +func (d *DB) rewriteSplitUserKeysLocked() error { + for { + // Look for any files that we must compact. + count, level, file, ok := findSplitUserKey(d.opts, d.mu.versions.currentVersion()) + d.mu.compact.nonatomicFileCount = count + if !ok { + // There are no multi-file atomic compaction units in the database. + return nil + } + + // Attempt to schedule a compaction to rewrite the split user key. + d.maybeScheduleCompactionPicker(func(picker compactionPicker, env compactionEnv) *pickedCompaction { + pc := picker.pickRewriteCompaction(env, level, file) + return pc + }) + + // The above attempt might succeed and schedule a rewrite compaction. Or + // there might not be available compaction concurrency to schedule the + // compaction. Or compaction of the file might have already been in + // progress. In any scenario, wait until there's some change in the + // state of active compactions. + + // Before waiting, check that the database hasn't been closed. Trying to + // schedule the compaction may have dropped d.mu while waiting for a + // manifest write to complete. In that dropped interim, the database may + // have been closed. + if err := d.closed.Load(); err != nil { + return err.(error) + } + d.mu.compact.cond.Wait() + + // Some flush or compaction was scheduled or completed. Loop again to + // check again for files that must be compacted. The next iteration may + // find same file again, but that's okay. It'll eventually succeed in + // scheduling the compaction and eventually be woken by its completion. + } +} diff --git a/format_major_version_test.go b/format_major_version_test.go index 761a8fe2e8..5c044e3230 100644 --- a/format_major_version_test.go +++ b/format_major_version_test.go @@ -5,9 +5,14 @@ package pebble import ( + "bytes" "fmt" + "strconv" "testing" + "time" + "github.com/cockroachdb/pebble/internal/base" + "github.com/cockroachdb/pebble/internal/datadriven" "github.com/cockroachdb/pebble/sstable" "github.com/cockroachdb/pebble/vfs" "github.com/cockroachdb/pebble/vfs/atomicfs" @@ -191,6 +196,7 @@ func TestFormatMajorVersions_TableFormat(t *testing.T) { FormatVersioned: sstable.TableFormatRocksDBv2, FormatSetWithDelete: sstable.TableFormatRocksDBv2, FormatBlockPropertyCollector: sstable.TableFormatPebblev1, + FormatSplitUserKeys: sstable.TableFormatPebblev1, FormatRangeKeys: sstable.TableFormatPebblev2, } @@ -204,3 +210,105 @@ func TestFormatMajorVersions_TableFormat(t *testing.T) { fmv := FormatNewest + 1 require.Panics(t, func() { _ = fmv.MaxTableFormat() }) } + +func TestSplitUserKeyMigration(t *testing.T) { + var d *DB + var opts *Options + var fs vfs.FS + var buf bytes.Buffer + defer func() { + if d != nil { + require.NoError(t, d.Close()) + } + }() + + datadriven.RunTest(t, "testdata/split_user_key_migration", + func(td *datadriven.TestData) string { + switch td.Cmd { + case "define": + if d != nil { + if err := d.Close(); err != nil { + return err.Error() + } + buf.Reset() + } + opts = &Options{ + FormatMajorVersion: FormatBlockPropertyCollector, + EventListener: EventListener{ + CompactionEnd: func(info CompactionInfo) { + // JobID's aren't deterministic, especially w/ table stats + // enabled. Use a fixed job ID for data-driven test output. + info.JobID = 100 + fmt.Fprintln(&buf, info) + }, + }, + } + var err error + if d, err = runDBDefineCmd(td, opts); err != nil { + return err.Error() + } + + // Mock time so that we get consistent log output written to + // buf. + t := time.Now() + d.timeNow = func() time.Time { + t = t.Add(time.Second) + return t + } + fs = d.opts.FS + d.mu.Lock() + defer d.mu.Unlock() + return d.mu.versions.currentVersion().DebugString(base.DefaultFormatter) + case "reopen": + if d != nil { + if err := d.Close(); err != nil { + return err.Error() + } + buf.Reset() + } + opts.FS = fs + var err error + d, err = Open("", opts) + if err != nil { + return err.Error() + } + // Mock time so that we get consistent log output written to + // buf. + t := time.Now() + d.timeNow = func() time.Time { + t = t.Add(time.Second) + return t + } + m := d.Metrics() + return fmt.Sprintf("%d nonatomic-files at open", m.Compact.NonatomicFiles) + case "build": + if err := runBuildCmd(td, d, fs); err != nil { + return err.Error() + } + return "" + case "force-ingest": + if err := runForceIngestCmd(td, d); err != nil { + return err.Error() + } + d.mu.Lock() + defer d.mu.Unlock() + return d.mu.versions.currentVersion().DebugString(base.DefaultFormatter) + case "format-major-version": + return d.FormatMajorVersion().String() + case "ratchet-format-major-version": + v, err := strconv.Atoi(td.CmdArgs[0].String()) + if err != nil { + return err.Error() + } + if err := d.RatchetFormatMajorVersion(FormatMajorVersion(v)); err != nil { + return err.Error() + } + return buf.String() + case "lsm": + return runLSMCmd(td, d) + default: + return fmt.Sprintf("unrecognized command %q", td.Cmd) + } + + }) +} diff --git a/ingest.go b/ingest.go index 6f94c75a9c..6e8b078e78 100644 --- a/ingest.go +++ b/ingest.go @@ -618,7 +618,10 @@ func (d *DB) Ingest(paths []string) error { if d.opts.ReadOnly { return ErrReadOnly } + return d.ingest(paths, ingestTargetLevel) +} +func (d *DB) ingest(paths []string, targetLevelFunc ingestTargetLevelFunc) error { // Allocate file numbers for all of the files being ingested and mark them as // pending in order to prevent them from being deleted. Note that this causes // the file number ordering to be out of alignment with sequence number @@ -715,7 +718,7 @@ func (d *DB) Ingest(paths []string) error { // Assign the sstables to the correct level in the LSM and apply the // version edit. - ve, err = d.ingestApply(jobID, meta) + ve, err = d.ingestApply(jobID, meta, targetLevelFunc) } d.commit.AllocateSeqNum(len(meta), prepare, apply) @@ -753,7 +756,17 @@ func (d *DB) Ingest(paths []string) error { return err } -func (d *DB) ingestApply(jobID int, meta []*fileMetadata) (*versionEdit, error) { +type ingestTargetLevelFunc func( + newIters tableNewIters, + iterOps IterOptions, + cmp Compare, + v *version, + baseLevel int, + compactions map[*compaction]struct{}, + meta *fileMetadata, +) (int, error) + +func (d *DB) ingestApply(jobID int, meta []*fileMetadata, findTargetLevel ingestTargetLevelFunc) (*versionEdit, error) { d.mu.Lock() defer d.mu.Unlock() @@ -778,7 +791,7 @@ func (d *DB) ingestApply(jobID int, meta []*fileMetadata) (*versionEdit, error) m := meta[i] f := &ve.NewFiles[i] var err error - f.Level, err = ingestTargetLevel(d.newIters, iterOps, d.cmp, current, baseLevel, d.mu.compact.inProgress, m) + f.Level, err = findTargetLevel(d.newIters, iterOps, d.cmp, current, baseLevel, d.mu.compact.inProgress, m) if err != nil { d.mu.versions.logUnlock() return nil, err diff --git a/metrics.go b/metrics.go index e4319ffd92..6a8892862d 100644 --- a/metrics.go +++ b/metrics.go @@ -133,6 +133,7 @@ type Metrics struct { ElisionOnlyCount int64 MoveCount int64 ReadCount int64 + RewriteCount int64 // An estimate of the number of bytes that need to be compacted for the LSM // to reach a stable state. EstimatedDebt uint64 @@ -142,6 +143,13 @@ type Metrics struct { InProgressBytes int64 // Number of compactions that are in-progress. NumInProgress int64 + // NonatomicFiles is a count of files that do not form their own atomic + // compaction unit, and thus always need to be compacted with some of + // their neighbors. Such files are no longer created and are scheduled + // for rewrite compactions when no other compactions are picked. This + // metric may be removed once FormatSplitUserKeys is the minimum + // supported format major version. + NonatomicFiles int } Flush struct { @@ -375,12 +383,13 @@ func (m *Metrics) SafeFormat(w redact.SafePrinter, _ rune) { humanize.IEC.Int64(m.Compact.InProgressBytes), redact.Safe(m.Compact.NumInProgress), redact.SafeString("")) - w.Printf(" ctype %9d %7d %7d %7d %7d (default, delete, elision, move, read)\n", + w.Printf(" ctype %9d %7d %7d %7d %7d %7d (default, delete, elision, move, read, rewrite)\n", redact.Safe(m.Compact.DefaultCount), redact.Safe(m.Compact.DeleteOnlyCount), redact.Safe(m.Compact.ElisionOnlyCount), redact.Safe(m.Compact.MoveCount), - redact.Safe(m.Compact.ReadCount)) + redact.Safe(m.Compact.ReadCount), + redact.Safe(m.Compact.RewriteCount)) w.Printf(" memtbl %9d %7s\n", redact.Safe(m.MemTable.Count), humanize.IEC.Uint64(m.MemTable.Size)) diff --git a/metrics_test.go b/metrics_test.go index 844d4653d6..b6400e1b57 100644 --- a/metrics_test.go +++ b/metrics_test.go @@ -28,6 +28,7 @@ func TestMetricsFormat(t *testing.T) { m.Compact.ElisionOnlyCount = 29 m.Compact.MoveCount = 30 m.Compact.ReadCount = 31 + m.Compact.RewriteCount = 32 m.Compact.EstimatedDebt = 6 m.Compact.InProgressBytes = 7 m.Compact.NumInProgress = 2 @@ -85,7 +86,7 @@ __level_____count____size___score______in__ingest(sz_cnt)____move(sz_cnt)___writ total 2807 2.7 K - 2.8 K 2.8 K 2.9 K 2.8 K 2.9 K 8.4 K 5.7 K 2.8 K 28 3.0 flush 8 compact 5 6 B 7 B 2 (size == estimated-debt, score = in-progress-bytes, in = num-in-progress) - ctype 27 28 29 30 31 (default, delete, elision, move, read) + ctype 27 28 29 30 31 32 (default, delete, elision, move, read, rewrite) memtbl 12 11 B zmemtbl 14 13 B ztbl 16 15 B @@ -228,7 +229,7 @@ __level_____count____size___score______in__ingest(sz_cnt)____move(sz_cnt)___writ total 0 0 B - 0 B 0 B 0 0 B 0 0 B 0 0 B 0 0.0 flush 0 compact 0 0 B 0 B 0 (size == estimated-debt, score = in-progress-bytes, in = num-in-progress) - ctype 0 0 0 0 0 (default, delete, elision, move, read) + ctype 0 0 0 0 0 0 (default, delete, elision, move, read, rewrite) memtbl 0 0 B zmemtbl 0 0 B ztbl 0 0 B diff --git a/open.go b/open.go index 5f98df058f..d21c61ff89 100644 --- a/open.go +++ b/open.go @@ -453,6 +453,15 @@ func Open(dirname string, opts *Options) (db *DB, _ error) { } d.calculateDiskAvailableBytes() + // If the format major version is earlier than FormatSplitUserKeys, there + // might be files that do not form their own atomic compaction unit. + // Initialize the count of such files. + // TODO(jackson): Remove this after CockroachDB 22.2. + if d.mu.formatVers.vers < FormatSplitUserKeys { + count, _, _, _ := findSplitUserKey(d.opts, d.mu.versions.currentVersion()) + d.mu.compact.nonatomicFileCount = count + } + d.maybeScheduleFlush() d.maybeScheduleCompaction() diff --git a/open_test.go b/open_test.go index f5bc45af80..520311d0ba 100644 --- a/open_test.go +++ b/open_test.go @@ -101,7 +101,7 @@ func TestNewDBFilenames(t *testing.T) { "LOCK", "MANIFEST-000001", "OPTIONS-000003", - "marker.format-version.000005.006", + "marker.format-version.000006.007", "marker.manifest.000001.MANIFEST-000001", }, } diff --git a/testdata/checkpoint b/testdata/checkpoint index f35759ef82..7c4d25b09e 100644 --- a/testdata/checkpoint +++ b/testdata/checkpoint @@ -34,6 +34,9 @@ sync: db create: db/marker.format-version.000005.006 close: db/marker.format-version.000005.006 sync: db +create: db/marker.format-version.000006.007 +close: db/marker.format-version.000006.007 +sync: db sync: db/MANIFEST-000001 create: db/000002.log sync: db @@ -99,9 +102,9 @@ close: open-dir: checkpoints/checkpoint1 link: db/OPTIONS-000003 -> checkpoints/checkpoint1/OPTIONS-000003 open-dir: checkpoints/checkpoint1 -create: checkpoints/checkpoint1/marker.format-version.000001.006 -sync: checkpoints/checkpoint1/marker.format-version.000001.006 -close: checkpoints/checkpoint1/marker.format-version.000001.006 +create: checkpoints/checkpoint1/marker.format-version.000001.007 +sync: checkpoints/checkpoint1/marker.format-version.000001.007 +close: checkpoints/checkpoint1/marker.format-version.000001.007 sync: checkpoints/checkpoint1 close: checkpoints/checkpoint1 create: checkpoints/checkpoint1/MANIFEST-000001 @@ -156,7 +159,7 @@ CURRENT LOCK MANIFEST-000001 OPTIONS-000003 -marker.format-version.000005.006 +marker.format-version.000006.007 marker.manifest.000001.MANIFEST-000001 list checkpoints/checkpoint1 @@ -166,7 +169,7 @@ list checkpoints/checkpoint1 000007.sst MANIFEST-000001 OPTIONS-000003 -marker.format-version.000001.006 +marker.format-version.000001.007 marker.manifest.000001.MANIFEST-000001 open checkpoints/checkpoint1 readonly diff --git a/testdata/event_listener b/testdata/event_listener index 67f3369b24..f3fba48f89 100644 --- a/testdata/event_listener +++ b/testdata/event_listener @@ -42,6 +42,10 @@ create: db/marker.format-version.000005.006 close: db/marker.format-version.000005.006 sync: db upgraded to format version: 006 +create: db/marker.format-version.000006.007 +close: db/marker.format-version.000006.007 +sync: db +upgraded to format version: 007 create: db/MANIFEST-000003 close: db/MANIFEST-000001 sync: db/MANIFEST-000003 @@ -182,7 +186,7 @@ __level_____count____size___score______in__ingest(sz_cnt)____move(sz_cnt)___writ total 3 2.3 K - 933 B 825 B 1 0 B 0 3.9 K 4 1.5 K 3 4.3 flush 3 compact 1 2.3 K 0 B 0 (size == estimated-debt, score = in-progress-bytes, in = num-in-progress) - ctype 1 0 0 0 0 (default, delete, elision, move, read) + ctype 1 0 0 0 0 0 (default, delete, elision, move, read, rewrite) memtbl 1 256 K zmemtbl 0 0 B ztbl 0 0 B @@ -209,9 +213,9 @@ close: open-dir: checkpoint link: db/OPTIONS-000004 -> checkpoint/OPTIONS-000004 open-dir: checkpoint -create: checkpoint/marker.format-version.000001.006 -sync: checkpoint/marker.format-version.000001.006 -close: checkpoint/marker.format-version.000001.006 +create: checkpoint/marker.format-version.000001.007 +sync: checkpoint/marker.format-version.000001.007 +close: checkpoint/marker.format-version.000001.007 sync: checkpoint close: checkpoint create: checkpoint/MANIFEST-000017 diff --git a/testdata/ingest b/testdata/ingest index 96f873771f..eaa5c1b4fc 100644 --- a/testdata/ingest +++ b/testdata/ingest @@ -43,7 +43,7 @@ __level_____count____size___score______in__ingest(sz_cnt)____move(sz_cnt)___writ total 1 833 B - 833 B 833 B 1 0 B 0 833 B 0 0 B 1 1.0 flush 0 compact 0 0 B 0 B 0 (size == estimated-debt, score = in-progress-bytes, in = num-in-progress) - ctype 0 0 0 0 0 (default, delete, elision, move, read) + ctype 0 0 0 0 0 0 (default, delete, elision, move, read, rewrite) memtbl 1 256 K zmemtbl 0 0 B ztbl 0 0 B diff --git a/testdata/ingest_load b/testdata/ingest_load index 4b082c3cb3..802bd9a6d0 100644 --- a/testdata/ingest_load +++ b/testdata/ingest_load @@ -89,11 +89,11 @@ a.RANGEDEL.0:b ranges: #0,0-#0,0 # Loading tables at an unsupported table format results in an error. -# Write a table at version 6 (Pebble,v2) into a DB at version 5 (Pebble,v1). -load writer-version=6 db-version=5 +# Write a table at version 7 (Pebble,v2) into a DB at version 6 (Pebble,v1). +load writer-version=7 db-version=6 a.SET.1: ---- -pebble: table with format (Pebble,v2) unsupported at DB format major version 5, (Pebble,v1) +pebble: table with format (Pebble,v2) unsupported at DB format major version 6, (Pebble,v1) # Tables with range keys only. diff --git a/testdata/metrics b/testdata/metrics index b4f9891b8d..adcf53ee55 100644 --- a/testdata/metrics +++ b/testdata/metrics @@ -29,7 +29,7 @@ __level_____count____size___score______in__ingest(sz_cnt)____move(sz_cnt)___writ total 1 771 B - 56 B 0 B 0 0 B 0 827 B 1 0 B 1 14.8 flush 1 compact 0 0 B 0 B 0 (size == estimated-debt, score = in-progress-bytes, in = num-in-progress) - ctype 0 0 0 0 0 (default, delete, elision, move, read) + ctype 0 0 0 0 0 0 (default, delete, elision, move, read, rewrite) memtbl 1 256 K zmemtbl 1 256 K ztbl 0 0 B @@ -77,7 +77,7 @@ __level_____count____size___score______in__ingest(sz_cnt)____move(sz_cnt)___writ total 1 778 B - 84 B 0 B 0 0 B 0 2.3 K 3 1.5 K 1 28.6 flush 2 compact 1 0 B 0 B 0 (size == estimated-debt, score = in-progress-bytes, in = num-in-progress) - ctype 1 0 0 0 0 (default, delete, elision, move, read) + ctype 1 0 0 0 0 0 (default, delete, elision, move, read, rewrite) memtbl 1 256 K zmemtbl 2 512 K ztbl 2 1.5 K @@ -110,7 +110,7 @@ __level_____count____size___score______in__ingest(sz_cnt)____move(sz_cnt)___writ total 1 778 B - 84 B 0 B 0 0 B 0 2.3 K 3 1.5 K 1 28.6 flush 2 compact 1 0 B 0 B 0 (size == estimated-debt, score = in-progress-bytes, in = num-in-progress) - ctype 1 0 0 0 0 (default, delete, elision, move, read) + ctype 1 0 0 0 0 0 (default, delete, elision, move, read, rewrite) memtbl 1 256 K zmemtbl 1 256 K ztbl 2 1.5 K @@ -140,7 +140,7 @@ __level_____count____size___score______in__ingest(sz_cnt)____move(sz_cnt)___writ total 1 778 B - 84 B 0 B 0 0 B 0 2.3 K 3 1.5 K 1 28.6 flush 2 compact 1 0 B 0 B 0 (size == estimated-debt, score = in-progress-bytes, in = num-in-progress) - ctype 1 0 0 0 0 (default, delete, elision, move, read) + ctype 1 0 0 0 0 0 (default, delete, elision, move, read, rewrite) memtbl 1 256 K zmemtbl 1 256 K ztbl 1 771 B @@ -173,7 +173,7 @@ __level_____count____size___score______in__ingest(sz_cnt)____move(sz_cnt)___writ total 1 778 B - 84 B 0 B 0 0 B 0 2.3 K 3 1.5 K 1 28.6 flush 2 compact 1 0 B 0 B 0 (size == estimated-debt, score = in-progress-bytes, in = num-in-progress) - ctype 1 0 0 0 0 (default, delete, elision, move, read) + ctype 1 0 0 0 0 0 (default, delete, elision, move, read, rewrite) memtbl 1 256 K zmemtbl 0 0 B ztbl 0 0 B diff --git a/testdata/rangekeys b/testdata/rangekeys index 98eb0bce45..7a3b4311cd 100644 --- a/testdata/rangekeys +++ b/testdata/rangekeys @@ -508,24 +508,24 @@ x: valid (., [x-z) @5=boop) x: valid (., [x-z) @5=boop) # Applying range keys to a DB running with a version that doesn't support them -# results in an error. Range keys were added in version 6. -reset format-major-version=5 +# results in an error. Range keys were added in version 7. +reset format-major-version=6 ---- batch range-key-set a z @5 boop ---- -pebble: range keys require at least format major version 6 (current: 5) +pebble: range keys require at least format major version 7 (current: 6) # Constructing iterator over range keys on a DB that doesn't support them # results in an error. -reset format-major-version=5 +reset format-major-version=6 ---- combined-iter ---- -pebble: range keys require at least format major version 6 (current: 5) +pebble: range keys require at least format major version 7 (current: 6) # Test iterator bounds provided via IterOptions. diff --git a/testdata/split_user_key_migration b/testdata/split_user_key_migration new file mode 100644 index 0000000000..0b3951b595 --- /dev/null +++ b/testdata/split_user_key_migration @@ -0,0 +1,108 @@ +define +L1 +d.SET.110:d e.SET.140:e +---- +1: + 000004:[d#110,SET-e#140,SET] + +reopen +---- +0 nonatomic-files at open + +# The current public Pebble interface offers no way of constructing a multi-file +# atomic compaction unit, so use the force-ingest command to force an ingestion +# into L1. + +build ef +set e e +set f f +---- + +force-ingest paths=(ef) level=1 +---- +1: + 000004:[d#110,SET-e#140,SET] + 000008:[e#1,SET-f#1,SET] + +format-major-version +---- +005 + +reopen +---- +1 nonatomic-files at open + +format-major-version +---- +005 + +ratchet-format-major-version 006 +---- +[JOB 100] compacted(rewrite) L1 [000004 000008] (1.6 K) + L1 [] (0 B) -> L1 [000012] (786 B), in 1.0s (2.0s total), output rate 786 B/s + +format-major-version +---- +006 + +lsm +---- +1: + 000012:[d#0,SET-f#0,SET] + +define +L1 +b.SET.0:b c.SET.5:c +L1 +m.SET.0:m l.SET.5:l +L1 +x.SET.0:x y.SET.5:y +---- +1: + 000004:[b#0,SET-c#5,SET] + 000005:[l#5,SET-m#0,SET] + 000006:[x#0,SET-y#5,SET] + +build ab +set a a +set b b +---- + +build cd +set c c +set d d +---- + +build wx +set w w +set x x +---- + +force-ingest paths=(ab, cd, wx) level=1 +---- +1: + 000007:[a#1,SET-b#1,SET] + 000004:[b#0,SET-c#5,SET] + 000008:[c#2,SET-d#2,SET] + 000005:[l#5,SET-m#0,SET] + 000009:[w#3,SET-x#3,SET] + 000006:[x#0,SET-y#5,SET] + +format-major-version +---- +005 + +ratchet-format-major-version 006 +---- +[JOB 100] compacted(rewrite) L1 [000007 000004 000008] (2.4 K) + L1 [] (0 B) -> L1 [000010] (794 B), in 1.0s (2.0s total), output rate 794 B/s +[JOB 100] compacted(rewrite) L1 [000009 000006] (1.6 K) + L1 [] (0 B) -> L1 [000011] (786 B), in 1.0s (2.0s total), output rate 786 B/s + +lsm +---- +1: + 000010:[a#0,SET-d#0,SET] + 000005:[l#5,SET-m#0,SET] + 000011:[w#0,SET-y#0,SET] + +format-major-version +---- +006 diff --git a/tool/testdata/db_lsm b/tool/testdata/db_lsm index ae4e3cf758..d60f1d8c0d 100644 --- a/tool/testdata/db_lsm +++ b/tool/testdata/db_lsm @@ -22,7 +22,7 @@ __level_____count____size___score______in__ingest(sz_cnt)____move(sz_cnt)___writ total 1 986 B - 0 B 0 B 0 0 B 0 0 B 0 0 B 0 0.0 flush 0 compact 0 0 B 0 B 0 (size == estimated-debt, score = in-progress-bytes, in = num-in-progress) - ctype 0 0 0 0 0 (default, delete, elision, move, read) + ctype 0 0 0 0 0 0 (default, delete, elision, move, read, rewrite) memtbl 1 256 K zmemtbl 0 0 B ztbl 0 0 B diff --git a/version_set.go b/version_set.go index 18b655e28f..72dffb2046 100644 --- a/version_set.go +++ b/version_set.go @@ -550,6 +550,10 @@ func (vs *versionSet) incrementCompactions(kind compactionKind) { case compactionKindRead: vs.metrics.Compact.Count++ vs.metrics.Compact.ReadCount++ + + case compactionKindRewrite: + vs.metrics.Compact.Count++ + vs.metrics.Compact.RewriteCount++ } }