From 93d4722618928842776e5a6ca09b0b99c0ac70cd Mon Sep 17 00:00:00 2001 From: Jackson Owens Date: Fri, 25 Feb 2022 13:33:14 -0500 Subject: [PATCH] db: add migration to rewrite atomic compaction units Add two new format major versions FormatSplitUserKeysMarked and FormatMarkCompacted that together may be used to guarantee that all sstables within the table form their own atomic compaction unit. Previous versions of Pebble (before #1470) and RocksDB allowed keys with identical user keys to be split across multiple tables within a level. This produced a set of files known as an 'atomic compaction unit' that must be compacted together to preserve the LSM invariant. The first format major version FormatSplitUserKeysMarked may be used to guarantee that every file that is member of a multi-file atomic compaction unit is marked for compaction. The 'marked-for-compaction' file metadata field is currently unused by Pebble, but is repurposed here to record the intent to recompact these files with split user keys. If ratcheting to FormatSplitUserKeysMarked discovers files with split user keys, it marks them for compaction and then rotates the manifest to ensure that the updated file metadata is persisted durably. This commit introduces a new rewrite compaction type. During compaction picking if no other productive compaction is picked, the compaction picker looks for files marked for compaction. It uses a manifest.Annotator to avoid a linear search through the file metadata. If a marked file exists, it picks a compaction that outputs into the file's existing level, pulling in its atomic compaction unit if necessary. The second format major version FormatMarkCompacted is used to guarantee that no files that are marked for compaction exist. This may be used in a subequent CockroachDB release (22.2) to ensure that all files marked for compaction by the FormatSplitUserKeysMarked format major version have been compacted away. Ratcheting to this format major version blocks until all the marked files are compacted. Together these format major versions will allow us to remove code necessary to handle these atomic compaction units, when we increase the minimum format major version supported by Pebble. Close #1495. --- compaction.go | 13 +- compaction_picker.go | 126 ++++++++++++++++- compaction_picker_test.go | 16 +++ compaction_test.go | 107 ++++++++++++++ data_test.go | 30 +++- db.go | 6 + flush_external.go | 2 +- format_major_version.go | 187 ++++++++++++++++++++++++- format_major_version_test.go | 113 +++++++++++++++ ingest.go | 21 ++- internal/manifest/btree.go | 20 +++ internal/manifest/level_metadata.go | 34 +++-- internal/manifest/version.go | 31 +++- internal/manifest/version_edit.go | 48 +++++-- internal/manifest/version_edit_test.go | 2 +- metrics.go | 10 +- metrics_test.go | 5 +- open.go | 3 +- open_test.go | 2 +- testdata/checkpoint | 16 ++- testdata/compaction_picker_L0 | 22 +++ testdata/event_listener | 74 +++++----- testdata/ingest | 2 +- testdata/ingest_load | 6 +- testdata/marked_for_compaction | 28 ++++ testdata/metrics | 10 +- testdata/rangekeys | 10 +- testdata/split_user_key_migration | 148 +++++++++++++++++++ tool/testdata/db_lsm | 2 +- version_set.go | 7 +- 30 files changed, 997 insertions(+), 104 deletions(-) create mode 100644 testdata/marked_for_compaction create mode 100644 testdata/split_user_key_migration diff --git a/compaction.go b/compaction.go index 9d21edc2f7..c50bebf6cf 100644 --- a/compaction.go +++ b/compaction.go @@ -285,6 +285,7 @@ const ( compactionKindDeleteOnly compactionKindElisionOnly compactionKindRead + compactionKindRewrite ) func (k compactionKind) String() string { @@ -301,6 +302,8 @@ func (k compactionKind) String() string { return "elision-only" case compactionKindRead: return "read" + case compactionKindRewrite: + return "rewrite" } return "?" } @@ -320,9 +323,9 @@ type compaction struct { // startLevel is the level that is being compacted. Inputs from startLevel // and outputLevel will be merged to produce a set of outputLevel files. startLevel *compactionLevel - // outputLevel is the level that files are being produced in. outputLevel is - // equal to startLevel+1 except when startLevel is 0 in which case it is - // equal to compactionPicker.baseLevel(). + // outputLevel is the level that files are being produced in. For default + // compactions, outputLevel is equal to startLevel+1 except when startLevel + // is 0 in which case it is equal to compactionPicker.baseLevel(). outputLevel *compactionLevel inputs []compactionLevel @@ -1433,7 +1436,7 @@ func (d *DB) flush1() error { } d.mu.versions.logLock() - err = d.mu.versions.logAndApply(jobID, ve, c.metrics, + err = d.mu.versions.logAndApply(jobID, ve, c.metrics, false, /* forceRotation */ func() []compactionInfo { return d.getInProgressCompactionInfoLocked(c) }) if err != nil { // TODO(peter): untested. @@ -1857,7 +1860,7 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) { info.Duration = d.timeNow().Sub(startTime) if err == nil { d.mu.versions.logLock() - err = d.mu.versions.logAndApply(jobID, ve, c.metrics, func() []compactionInfo { + err = d.mu.versions.logAndApply(jobID, ve, c.metrics, false /* forceRotation */, func() []compactionInfo { return d.getInProgressCompactionInfoLocked(c) }) if err != nil { diff --git a/compaction_picker.go b/compaction_picker.go index ed518fdb82..761f57ce7c 100644 --- a/compaction_picker.go +++ b/compaction_picker.go @@ -37,6 +37,7 @@ type compactionPicker interface { pickAuto(env compactionEnv) (pc *pickedCompaction) pickManual(env compactionEnv, manual *manualCompaction) (c *pickedCompaction, retryLater bool) pickElisionOnlyCompaction(env compactionEnv) (pc *pickedCompaction) + pickRewriteCompaction(env compactionEnv) (pc *pickedCompaction) pickReadTriggeredCompaction(env compactionEnv) (pc *pickedCompaction) forceBaseLevel1() } @@ -109,9 +110,9 @@ type pickedCompaction struct { // startLevel is the level that is being compacted. Inputs from startLevel // and outputLevel will be merged to produce a set of outputLevel files. startLevel *compactionLevel - // outputLevel is the level that files are being produced in. outputLevel is - // equal to startLevel+1 except when startLevel is 0 in which case it is - // equal to compactionPicker.baseLevel(). + // outputLevel is the level that files are being produced in. In default + // compactions, outputLevel is equal to startLevel+1 except when startLevel + // is 0 in which case it is equal to compactionPicker.baseLevel(). outputLevel *compactionLevel // adjustedOutputLevel is the output level used for the purpose of // determining the target output file size, overlap bytes, and expanded @@ -1066,6 +1067,25 @@ func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompact *env.readCompactionEnv.rescheduleReadCompaction = true } + // At the lowest possible compaction-picking priority, look for files marked + // for compaction. Pebble will mark files for compaction if they have atomic + // compaction units that span multiple files. While current Pebble code does + // not construct such sstables, RocksDB and earlier versions of Pebble may + // have created them. These split user keys form sets of files that must be + // compacted together for correctness (referred to as "atomic compaction + // units" within the code). Rewrite them in-place. + // + // It's also possible that a file may have been marked for compaction by + // even earlier versions of Pebble code, since FileMetadata's + // MarkedForCompaction field is persisted in the manifest. That's okay. We + // previously would've ignored the designation, whereas now we'll re-compact + // the file in place. + if p.vers.Stats.MarkedForCompaction > 0 { + if pc := p.pickRewriteCompaction(env); pc != nil { + return pc + } + } + return nil } @@ -1129,6 +1149,46 @@ func (a elisionOnlyAnnotator) Merge(v interface{}, accum interface{}) interface{ return accumV } +// markedForCompactionAnnotator implements the manifest.Annotator interface, +// annotating B-Tree nodes with the *fileMetadata of a file that is marked for +// compaction within the subtree. If multiple files meet the criteria, it +// chooses whichever file has the lowest LargestSeqNum. +type markedForCompactionAnnotator struct{} + +var _ manifest.Annotator = markedForCompactionAnnotator{} + +func (a markedForCompactionAnnotator) Zero(interface{}) interface{} { + return nil +} + +func (a markedForCompactionAnnotator) Accumulate( + f *fileMetadata, dst interface{}, +) (interface{}, bool) { + if !f.MarkedForCompaction { + // Not marked for compaction; return dst. + return dst, true + } + return markedMergeHelper(f, dst) +} + +func (a markedForCompactionAnnotator) Merge(v interface{}, accum interface{}) interface{} { + if v == nil { + return accum + } + accum, _ = markedMergeHelper(v.(*fileMetadata), accum) + return accum +} + +// REQUIRES: f is non-nil, and f.MarkedForCompaction=true. +func markedMergeHelper(f *fileMetadata, dst interface{}) (interface{}, bool) { + if dst == nil { + return f, true + } else if dstV := dst.(*fileMetadata); dstV.LargestSeqNum > f.LargestSeqNum { + return f, true + } + return dst, true +} + // pickElisionOnlyCompaction looks for compactions of sstables in the // bottommost level containing obsolete records that may now be dropped. func (p *compactionPickerByScore) pickElisionOnlyCompaction( @@ -1164,6 +1224,66 @@ func (p *compactionPickerByScore) pickElisionOnlyCompaction( return nil } +// pickRewriteCompaction attempts to construct a compaction that +// rewrites a file marked for compaction. pickRewriteCompaction will +// pull in adjacent files in the file's atomic compaction unit if +// necessary. A rewrite compaction outputs files to the same level as +// the input level. +func (p *compactionPickerByScore) pickRewriteCompaction(env compactionEnv) (pc *pickedCompaction) { + for l := numLevels - 1; l >= 0; l-- { + v := p.vers.Levels[l].Annotation(markedForCompactionAnnotator{}) + if v == nil { + // Try the next level. + continue + } + candidate := v.(*fileMetadata) + if candidate.Compacting { + // Try the next level. + continue + } + lf := p.vers.Levels[l].Find(p.opts.Comparer.Compare, candidate) + if lf == nil { + panic(fmt.Sprintf("file %s not found in level %d as expected", candidate.FileNum, numLevels-1)) + } + + inputs := lf.Slice() + // L0 files generated by a flush have never been split such that + // adjacent files can contain the same user key. So we do not need to + // rewrite an atomic compaction unit for L0. Note that there is nothing + // preventing two different flushes from producing files that are + // non-overlapping from an InternalKey perspective, but span the same + // user key. However, such files cannot be in the same L0 sublevel, + // since each sublevel requires non-overlapping user keys (unlike other + // levels). + if l > 0 { + // Find this file's atomic compaction unit. This is only relevant + // for levels L1+. + var isCompacting bool + inputs, isCompacting = expandToAtomicUnit( + p.opts.Comparer.Compare, + inputs, + false, /* disableIsCompacting */ + ) + if isCompacting { + // Try the next level. + continue + } + } + + pc = newPickedCompaction(p.opts, p.vers, l, l, p.baseLevel) + pc.outputLevel.level = l + pc.kind = compactionKindRewrite + pc.startLevel.files = inputs + pc.smallest, pc.largest = manifest.KeyRange(pc.cmp, pc.startLevel.files.Iter()) + + // Fail-safe to protect against compacting the same sstable concurrently. + if !inputRangeAlreadyCompacting(env, pc) { + return pc + } + } + return nil +} + // pickAutoLPositive picks an automatic compaction for the candidate // file in a positive-numbered level. This function must not be used for // L0. diff --git a/compaction_picker_test.go b/compaction_picker_test.go index eba520a78a..297f7bcdde 100644 --- a/compaction_picker_test.go +++ b/compaction_picker_test.go @@ -536,6 +536,22 @@ func TestCompactionPickerL0(t *testing.T) { return "nil" } return result.String() + case "mark-for-compaction": + var fileNum uint64 + td.ScanArgs(t, "file", &fileNum) + for l, lm := range picker.vers.Levels { + iter := lm.Iter() + for f := iter.First(); f != nil; f = iter.Next() { + if f.FileNum != base.FileNum(fileNum) { + continue + } + f.MarkedForCompaction = true + picker.vers.Stats.MarkedForCompaction++ + picker.vers.Levels[l].InvalidateAnnotation(markedForCompactionAnnotator{}) + return fmt.Sprintf("marked L%d.%s", l, f.FileNum) + } + } + return "not-found" case "max-output-file-size": if pc == nil { return "no compaction" diff --git a/compaction_test.go b/compaction_test.go index 7225efae99..e9ee8f3963 100644 --- a/compaction_test.go +++ b/compaction_test.go @@ -96,6 +96,12 @@ func (p *compactionPickerForTesting) pickElisionOnlyCompaction( return nil } +func (p *compactionPickerForTesting) pickRewriteCompaction( + env compactionEnv, +) (pc *pickedCompaction) { + return nil +} + func (p *compactionPickerForTesting) pickManual( env compactionEnv, manual *manualCompaction, ) (pc *pickedCompaction, retryLater bool) { @@ -3348,3 +3354,104 @@ func Test_calculateInuseKeyRanges(t *testing.T) { }) } } + +func TestMarkedForCompaction(t *testing.T) { + var mem vfs.FS = vfs.NewMem() + var d *DB + defer func() { + if d != nil { + require.NoError(t, d.Close()) + } + }() + + var buf bytes.Buffer + opts := &Options{ + FS: mem, + DebugCheck: DebugCheckLevels, + DisableAutomaticCompactions: true, + FormatMajorVersion: FormatNewest, + EventListener: EventListener{ + CompactionEnd: func(info CompactionInfo) { + info.JobID = 100 // Fix to avoid nondeterminism. + fmt.Fprintln(&buf, info) + }, + }, + } + + reset := func() { + if d != nil { + require.NoError(t, d.Close()) + } + mem = vfs.NewMem() + require.NoError(t, mem.MkdirAll("ext", 0755)) + + var err error + d, err = Open("", opts) + require.NoError(t, err) + } + datadriven.RunTest(t, "testdata/marked_for_compaction", func(td *datadriven.TestData) string { + switch td.Cmd { + case "reset": + reset() + return "" + + case "define": + if d != nil { + if err := d.Close(); err != nil { + return err.Error() + } + } + var err error + if d, err = runDBDefineCmd(td, opts); err != nil { + return err.Error() + } + d.mu.Lock() + defer d.mu.Unlock() + t := time.Now() + d.timeNow = func() time.Time { + t = t.Add(time.Second) + return t + } + s := d.mu.versions.currentVersion().DebugString(base.DefaultFormatter) + return s + + case "mark-for-compaction": + d.mu.Lock() + defer d.mu.Unlock() + vers := d.mu.versions.currentVersion() + var fileNum uint64 + td.ScanArgs(t, "file", &fileNum) + for l, lm := range vers.Levels { + iter := lm.Iter() + for f := iter.First(); f != nil; f = iter.Next() { + if f.FileNum != base.FileNum(fileNum) { + continue + } + f.MarkedForCompaction = true + vers.Stats.MarkedForCompaction++ + vers.Levels[l].InvalidateAnnotation(markedForCompactionAnnotator{}) + return fmt.Sprintf("marked L%d.%s", l, f.FileNum) + } + } + return "not-found" + + case "maybe-compact": + d.mu.Lock() + defer d.mu.Unlock() + d.opts.DisableAutomaticCompactions = false + d.maybeScheduleCompaction() + for d.mu.compact.compactingCount > 0 { + d.mu.compact.cond.Wait() + } + + fmt.Fprintln(&buf, d.mu.versions.currentVersion().DebugString(base.DefaultFormatter)) + s := strings.TrimSpace(buf.String()) + buf.Reset() + opts.DisableAutomaticCompactions = true + return s + + default: + return fmt.Sprintf("unknown command: %s", td.Cmd) + } + }) +} diff --git a/data_test.go b/data_test.go index b9e1173785..4470ab1faf 100644 --- a/data_test.go +++ b/data_test.go @@ -729,7 +729,7 @@ func runDBDefineCmd(td *datadriven.TestData, opts *Options) (*DB, error) { jobID := d.mu.nextJobID d.mu.nextJobID++ d.mu.versions.logLock() - if err := d.mu.versions.logAndApply(jobID, ve, newFileMetrics(ve.NewFiles), func() []compactionInfo { + if err := d.mu.versions.logAndApply(jobID, ve, newFileMetrics(ve.NewFiles), false, func() []compactionInfo { return nil }); err != nil { return nil, err @@ -822,6 +822,34 @@ func runIngestCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error { return nil } +func runForceIngestCmd(td *datadriven.TestData, d *DB) error { + var paths []string + var level int + for _, arg := range td.CmdArgs { + switch arg.Key { + case "paths": + paths = append(paths, arg.Vals...) + case "level": + var err error + level, err = strconv.Atoi(arg.Vals[0]) + if err != nil { + return err + } + } + } + return d.ingest(paths, func( + tableNewIters, + IterOptions, + Compare, + *version, + int, + map[*compaction]struct{}, + *fileMetadata, + ) (int, error) { + return level, nil + }) +} + func runLSMCmd(td *datadriven.TestData, d *DB) string { d.mu.Lock() s := d.mu.versions.currentVersion().String() diff --git a/db.go b/db.go index d8ecd83f0e..47c9f87256 100644 --- a/db.go +++ b/db.go @@ -320,6 +320,11 @@ type DB struct { // marker is moved in order to atomically record the new // version. marker *atomicfs.Marker + // ratcheting when set to true indicates that the database is + // currently in the process of ratcheting the format major version + // to vers + 1. As a part of ratcheting the format major version, + // migrations may drop and re-acquire the mutex. + ratcheting bool } // The ID of the next job. Job IDs are passed to event listener @@ -1394,6 +1399,7 @@ func (d *DB) Metrics() *Metrics { metrics.Compact.EstimatedDebt = d.mu.versions.picker.estimatedCompactionDebt(0) metrics.Compact.InProgressBytes = atomic.LoadInt64(&d.mu.versions.atomic.atomicInProgressBytes) metrics.Compact.NumInProgress = int64(d.mu.compact.compactingCount) + metrics.Compact.MarkedFiles = d.mu.versions.currentVersion().Stats.MarkedForCompaction for _, m := range d.mu.mem.queue { metrics.MemTable.Size += m.totalBytes() } diff --git a/flush_external.go b/flush_external.go index c9530b03d9..179beb2127 100644 --- a/flush_external.go +++ b/flush_external.go @@ -74,7 +74,7 @@ func flushExternalTable(untypedDB interface{}, path string, originalMeta *fileMe TablesIngested: 1, }, } - err := d.mu.versions.logAndApply(jobID, ve, metrics, func() []compactionInfo { + err := d.mu.versions.logAndApply(jobID, ve, metrics, false /* forceRotation */, func() []compactionInfo { return d.getInProgressCompactionInfoLocked(nil) }) if err != nil { diff --git a/format_major_version.go b/format_major_version.go index 517074382d..1246d1333f 100644 --- a/format_major_version.go +++ b/format_major_version.go @@ -10,6 +10,7 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" + "github.com/cockroachdb/pebble/internal/manifest" "github.com/cockroachdb/pebble/sstable" "github.com/cockroachdb/pebble/vfs" "github.com/cockroachdb/pebble/vfs/atomicfs" @@ -70,6 +71,20 @@ const ( // FormatBlockPropertyCollector is a format major version that introduces // BlockPropertyCollectors. FormatBlockPropertyCollector + // FormatSplitUserKeysMarked is a format major version that guarantees that + // all files that share user keys with neighbors are marked for compaction + // in the manifest. Ratcheting to FormatSplitUserKeysMarked will block + // (without holding mutexes) until the scan of the LSM is complete and the + // manifest has been rotated. + FormatSplitUserKeysMarked + // FormatMarkedCompacted is a format major version that guarantees that all + // files explicitly marked for compaction in the manifest have been + // compacted. Combined with the FormatSplitUserKeysMarked format major + // version, this version guarantees that there are no user keys split across + // multiple files within a level L1+. Ratcheting to this format version will + // block (without holding mutexes) until all necessary compactions for files + // marked for compaction are complete. + FormatMarkedCompacted // FormatRangeKeys is a format major version that introduces range keys. FormatRangeKeys // FormatNewest always contains the most recent format major version. @@ -86,7 +101,7 @@ func (v FormatMajorVersion) MaxTableFormat() sstable.TableFormat { case FormatDefault, FormatMostCompatible, formatVersionedManifestMarker, FormatVersioned, FormatSetWithDelete: return sstable.TableFormatRocksDBv2 - case FormatBlockPropertyCollector: + case FormatBlockPropertyCollector, FormatSplitUserKeysMarked, FormatMarkedCompacted: return sstable.TableFormatPebblev1 case FormatRangeKeys: return sstable.TableFormatPebblev2 @@ -171,6 +186,23 @@ var formatMajorVersionMigrations = map[FormatMajorVersion]func(*DB) error{ FormatBlockPropertyCollector: func(d *DB) error { return d.finalizeFormatVersUpgrade(FormatBlockPropertyCollector) }, + FormatSplitUserKeysMarked: func(d *DB) error { + // Mark any unmarked files with split-user keys. Note all format major + // versions migrations are invoked with DB.mu locked. + if err := d.markFilesWithSplitUserKeysLocked(); err != nil { + return err + } + return d.finalizeFormatVersUpgrade(FormatSplitUserKeysMarked) + }, + FormatMarkedCompacted: func(d *DB) error { + // Before finalizing the format major version, rewrite any sstables + // still marked for compaction. Note all format major versions + // migrations are invoked with DB.mu locked. + if err := d.compactMarkedFilesLocked(); err != nil { + return err + } + return d.finalizeFormatVersUpgrade(FormatMarkedCompacted) + }, FormatRangeKeys: func(d *DB) error { return d.finalizeFormatVersUpgrade(FormatRangeKeys) }, @@ -240,6 +272,12 @@ func (d *DB) ratchetFormatMajorVersionLocked(formatVers FormatMajorVersion) erro return errors.Newf("pebble: database already at format major version %d; cannot reduce to %d", d.mu.formatVers.vers, formatVers) } + if d.mu.formatVers.ratcheting { + return errors.Newf("pebble: database format major version upgrade is in-progress") + } + d.mu.formatVers.ratcheting = true + defer func() { d.mu.formatVers.ratcheting = false }() + for nextVers := d.mu.formatVers.vers + 1; nextVers <= formatVers; nextVers++ { if err := formatMajorVersionMigrations[nextVers](d); err != nil { return errors.Wrapf(err, "migrating to version %d", nextVers) @@ -274,3 +312,150 @@ func (d *DB) finalizeFormatVersUpgrade(formatVers FormatMajorVersion) error { d.opts.EventListener.FormatUpgrade(formatVers) return nil } + +// compactMarkedFilesLocked performs a migration that schedules rewrite +// compactions to compact away any sstables marked for compaction. +// compactMarkedFilesLocked is run while ratcheting the database's format major +// version to FormatMarkedCompacted. +func (d *DB) compactMarkedFilesLocked() error { + curr := d.mu.versions.currentVersion() + for curr.Stats.MarkedForCompaction > 0 { + // Attempt to schedule a compaction to rewrite a file marked for + // compaction. + d.maybeScheduleCompactionPicker(func(picker compactionPicker, env compactionEnv) *pickedCompaction { + return picker.pickRewriteCompaction(env) + }) + + // The above attempt might succeed and schedule a rewrite compaction. Or + // there might not be available compaction concurrency to schedule the + // compaction. Or compaction of the file might have already been in + // progress. In any scenario, wait until there's some change in the + // state of active compactions. + + // Before waiting, check that the database hasn't been closed. Trying to + // schedule the compaction may have dropped d.mu while waiting for a + // manifest write to complete. In that dropped interim, the database may + // have been closed. + if err := d.closed.Load(); err != nil { + return err.(error) + } + // NB: Waiting on this condition variable drops d.mu while blocked. + d.mu.compact.cond.Wait() + + // Some flush or compaction was scheduled or completed. Loop again to + // check again for files that must be compacted. The next iteration may + // find same file again, but that's okay. It'll eventually succeed in + // scheduling the compaction and eventually be woken by its completion. + curr = d.mu.versions.currentVersion() + } + return nil +} + +// markFilesWithSplitUserKeysLocked scans the LSM's levels 1 through 6 for +// adjacent files that contain the same user key. Such arrangements of files +// were permitted in RocksDB and in Pebble up to SHA a860bbad. +// markFilesWithSplitUserKeysLocked durably marks such files for compaction. +func (d *DB) markFilesWithSplitUserKeysLocked() error { + jobID := d.mu.nextJobID + d.mu.nextJobID++ + + // Files with split user keys are expected to be rare and performing key + // comparisons for every file within the LSM is expensive, so drop the + // database lock while scanning the file metadata. + // + // After scanning, if we found files to mark, we'll re-acquire the mutex + // before marking them, since MarkedForCompaction and the version's + // Stats.MarkedForCompaction are protected by d.mu. + var files [numLevels][]*fileMetadata + var foundSplitUserKey bool + func() { + equal := d.opts.equal() + vers := d.mu.versions.currentVersion() + // Note the unusual locking: unlock, defer Lock(). + d.mu.Unlock() + defer d.mu.Lock() + + for l := numLevels - 1; l > 0; l-- { + iter := vers.Levels[l].Iter() + var prevFile *fileMetadata + var prevUserKey []byte + for f := iter.First(); f != nil; f = iter.Next() { + if prevUserKey != nil && equal(prevUserKey, f.Smallest.UserKey) { + // NB: We may append a file twice, once as prevFile and once + // as f. That's okay, and handled below. + files[l] = append(files[l], prevFile, f) + foundSplitUserKey = true + } + if f.Largest.IsExclusiveSentinel() { + prevUserKey = nil + prevFile = nil + } else { + prevUserKey = f.Largest.UserKey + prevFile = f + } + } + } + }() + + // The database lock has been acquired again by the defer within the above + // anonymous function. + if !foundSplitUserKey { + // Nothing to do. + return nil + } + + // Lock the manifest for a coherent view of the LSM. + d.mu.versions.logLock() + vers := d.mu.versions.currentVersion() + for l, filesToMark := range files { + if len(filesToMark) == 0 { + continue + } + + for _, f := range filesToMark { + if f.MarkedForCompaction { + continue + } + + // We need to determine if the file still exists in the current + // version of the LSM. Note that we're making the assumption that if + // f still exists within the LSM, it's within the same level. That's + // okay, because only move compactions may move files between levels + // and move compactions only apply to compactions with a single file + // in the start level, but we already know these files have + // multi-file atomic compaction units. + if m := vers.Levels[l].Find(d.cmp, f); m != nil { + // It's still there. Mark it for compaction and update the + // version's stats. + f.MarkedForCompaction = true + vers.Stats.MarkedForCompaction++ + } + } + // The compaction picker uses the markedForCompactionAnnotator to + // quickly find files marked for compaction, or to quickly determine + // that there are no such files marked for compaction within a level. + // A b-tree node may be annotated with an annotation recording that + // there are no files marked for compaction within the node's subtree, + // based on the assumption that it's static. + // + // Since we're marking files for compaction, these b-tree nodes' + // annotations will be out of date. Clear the compaction-picking + // annotation, so that it's recomputed the next time the compaction + // picker looks for a file marked for compaction. + vers.Levels[l].InvalidateAnnotation(markedForCompactionAnnotator{}) + } + + // The 'marked-for-compaction' bit is persisted in the MANIFEST file + // metadata. We've already modified the in-memory file metadata, but the + // manifest hasn't been updated. Force rotation to a new MANIFEST file, + // which will write every file metadata to the new manifest file and ensure + // that the now marked-for-compaction file metadata are persisted as marked. + // NB: This call to logAndApply will unlockthe MANIFEST, which we locked up + // above before obtaining `vers`. + return d.mu.versions.logAndApply( + jobID, + &manifest.VersionEdit{}, + map[int]*LevelMetrics{}, + true, /* forceRotation */ + func() []compactionInfo { return d.getInProgressCompactionInfoLocked(nil) }) +} diff --git a/format_major_version_test.go b/format_major_version_test.go index f37a6a9a03..920e6cc42e 100644 --- a/format_major_version_test.go +++ b/format_major_version_test.go @@ -5,9 +5,14 @@ package pebble import ( + "bytes" "fmt" + "strconv" "testing" + "time" + "github.com/cockroachdb/pebble/internal/base" + "github.com/cockroachdb/pebble/internal/datadriven" "github.com/cockroachdb/pebble/sstable" "github.com/cockroachdb/pebble/vfs" "github.com/cockroachdb/pebble/vfs/atomicfs" @@ -191,6 +196,8 @@ func TestFormatMajorVersions_TableFormat(t *testing.T) { FormatVersioned: sstable.TableFormatRocksDBv2, FormatSetWithDelete: sstable.TableFormatRocksDBv2, FormatBlockPropertyCollector: sstable.TableFormatPebblev1, + FormatSplitUserKeysMarked: sstable.TableFormatPebblev1, + FormatMarkedCompacted: sstable.TableFormatPebblev1, FormatRangeKeys: sstable.TableFormatPebblev2, } @@ -204,3 +211,109 @@ func TestFormatMajorVersions_TableFormat(t *testing.T) { fmv := FormatNewest + 1 require.Panics(t, func() { _ = fmv.MaxTableFormat() }) } + +func TestSplitUserKeyMigration(t *testing.T) { + var d *DB + var opts *Options + var fs vfs.FS + var buf bytes.Buffer + defer func() { + if d != nil { + require.NoError(t, d.Close()) + } + }() + + datadriven.RunTest(t, "testdata/split_user_key_migration", + func(td *datadriven.TestData) string { + switch td.Cmd { + case "define": + if d != nil { + if err := d.Close(); err != nil { + return err.Error() + } + buf.Reset() + } + opts = &Options{ + FormatMajorVersion: FormatBlockPropertyCollector, + EventListener: EventListener{ + CompactionEnd: func(info CompactionInfo) { + // JobID's aren't deterministic, especially w/ table stats + // enabled. Use a fixed job ID for data-driven test output. + info.JobID = 100 + fmt.Fprintln(&buf, info) + }, + }, + } + var err error + if d, err = runDBDefineCmd(td, opts); err != nil { + return err.Error() + } + + // Mock time so that we get consistent log output written to + // buf. + t := time.Now() + d.timeNow = func() time.Time { + t = t.Add(time.Second) + return t + } + fs = d.opts.FS + d.mu.Lock() + defer d.mu.Unlock() + return d.mu.versions.currentVersion().DebugString(base.DefaultFormatter) + case "reopen": + if d != nil { + if err := d.Close(); err != nil { + return err.Error() + } + buf.Reset() + } + opts.FS = fs + var err error + d, err = Open("", opts) + if err != nil { + return err.Error() + } + // Mock time so that we get consistent log output written to + // buf. + d.mu.Lock() + defer d.mu.Unlock() + t := time.Now() + d.timeNow = func() time.Time { + t = t.Add(time.Second) + return t + } + return "OK" + case "build": + if err := runBuildCmd(td, d, fs); err != nil { + return err.Error() + } + return "" + case "force-ingest": + if err := runForceIngestCmd(td, d); err != nil { + return err.Error() + } + d.mu.Lock() + defer d.mu.Unlock() + return d.mu.versions.currentVersion().DebugString(base.DefaultFormatter) + case "format-major-version": + return d.FormatMajorVersion().String() + case "ratchet-format-major-version": + v, err := strconv.Atoi(td.CmdArgs[0].String()) + if err != nil { + return err.Error() + } + if err := d.RatchetFormatMajorVersion(FormatMajorVersion(v)); err != nil { + return err.Error() + } + return buf.String() + case "lsm": + return runLSMCmd(td, d) + case "marked-file-count": + m := d.Metrics() + return fmt.Sprintf("%d files marked for compaction", m.Compact.MarkedFiles) + default: + return fmt.Sprintf("unrecognized command %q", td.Cmd) + } + + }) +} diff --git a/ingest.go b/ingest.go index 641c6e5d22..d27796263e 100644 --- a/ingest.go +++ b/ingest.go @@ -584,7 +584,10 @@ func (d *DB) Ingest(paths []string) error { if d.opts.ReadOnly { return ErrReadOnly } + return d.ingest(paths, ingestTargetLevel) +} +func (d *DB) ingest(paths []string, targetLevelFunc ingestTargetLevelFunc) error { // Allocate file numbers for all of the files being ingested and mark them as // pending in order to prevent them from being deleted. Note that this causes // the file number ordering to be out of alignment with sequence number @@ -681,7 +684,7 @@ func (d *DB) Ingest(paths []string) error { // Assign the sstables to the correct level in the LSM and apply the // version edit. - ve, err = d.ingestApply(jobID, meta) + ve, err = d.ingestApply(jobID, meta, targetLevelFunc) } d.commit.AllocateSeqNum(len(meta), prepare, apply) @@ -719,7 +722,17 @@ func (d *DB) Ingest(paths []string) error { return err } -func (d *DB) ingestApply(jobID int, meta []*fileMetadata) (*versionEdit, error) { +type ingestTargetLevelFunc func( + newIters tableNewIters, + iterOps IterOptions, + cmp Compare, + v *version, + baseLevel int, + compactions map[*compaction]struct{}, + meta *fileMetadata, +) (int, error) + +func (d *DB) ingestApply(jobID int, meta []*fileMetadata, findTargetLevel ingestTargetLevelFunc) (*versionEdit, error) { d.mu.Lock() defer d.mu.Unlock() @@ -744,7 +757,7 @@ func (d *DB) ingestApply(jobID int, meta []*fileMetadata) (*versionEdit, error) m := meta[i] f := &ve.NewFiles[i] var err error - f.Level, err = ingestTargetLevel(d.newIters, iterOps, d.cmp, current, baseLevel, d.mu.compact.inProgress, m) + f.Level, err = findTargetLevel(d.newIters, iterOps, d.cmp, current, baseLevel, d.mu.compact.inProgress, m) if err != nil { d.mu.versions.logUnlock() return nil, err @@ -760,7 +773,7 @@ func (d *DB) ingestApply(jobID int, meta []*fileMetadata) (*versionEdit, error) levelMetrics.BytesIngested += m.Size levelMetrics.TablesIngested++ } - if err := d.mu.versions.logAndApply(jobID, ve, metrics, func() []compactionInfo { + if err := d.mu.versions.logAndApply(jobID, ve, metrics, false /* forceRotation */, func() []compactionInfo { return d.getInProgressCompactionInfoLocked(nil) }); err != nil { return nil, err diff --git a/internal/manifest/btree.go b/internal/manifest/btree.go index 1a1d39256c..13ccaf577b 100644 --- a/internal/manifest/btree.go +++ b/internal/manifest/btree.go @@ -575,6 +575,26 @@ func (n *node) rebalanceOrMerge(i int) { } } +func (n *node) invalidateAnnotation(a Annotator) { + // Find this annotator's annotation on this node. + var annot *annotation + for i := range n.annot { + if n.annot[i].annotator == a { + annot = &n.annot[i] + } + } + + if annot != nil && annot.valid { + annot.valid = false + annot.v = a.Zero(annot.v) + } + if !n.leaf { + for i := int16(0); i <= n.count; i++ { + n.children[i].invalidateAnnotation(a) + } + } +} + func (n *node) annotation(a Annotator) (interface{}, bool) { // Find this annotator's annotation on this node. var annot *annotation diff --git a/internal/manifest/level_metadata.go b/internal/manifest/level_metadata.go index 153b8a014e..6e7f0f6c08 100644 --- a/internal/manifest/level_metadata.go +++ b/internal/manifest/level_metadata.go @@ -14,14 +14,16 @@ import ( // LevelMetadata contains metadata for all of the files within // a level of the LSM. type LevelMetadata struct { - tree btree + level int + tree btree } // clone makes a copy of the level metadata, implicitly increasing the ref // count of every file contained within lm. func (lm *LevelMetadata) clone() LevelMetadata { return LevelMetadata{ - tree: lm.tree.clone(), + level: lm.level, + tree: lm.tree.clone(), } } @@ -35,6 +37,7 @@ func makeLevelMetadata(cmp Compare, level int, files []*FileMetadata) LevelMetad bcmp = btreeCmpSmallestKey(cmp) } var lm LevelMetadata + lm.level = level lm.tree, _ = makeBTree(bcmp, files) return lm } @@ -68,13 +71,16 @@ func (lm *LevelMetadata) Slice() LevelSlice { return LevelSlice{iter: lm.tree.iter(), length: lm.tree.length} } -// Find finds the provided file in the level if it exists. The level must be -// key-sorted (eg, non-L0). +// Find finds the provided file in the level if it exists. func (lm *LevelMetadata) Find(cmp base.Compare, m *FileMetadata) *LevelFile { - // TODO(jackson): Add an assertion that lm is key-sorted. - o := overlaps(lm.Iter(), cmp, m.Smallest.UserKey, - m.Largest.UserKey, m.Largest.IsExclusiveSentinel()) - iter := o.Iter() + iter := lm.Iter() + if lm.level != 0 { + // If lm holds files for levels >0, we can narrow our search by binary + // searching by bounds. + o := overlaps(iter, cmp, m.Smallest.UserKey, + m.Largest.UserKey, m.Largest.IsExclusiveSentinel()) + iter = o.Iter() + } for f := iter.First(); f != nil; f = iter.Next() { if f == m { lf := iter.Take() @@ -97,6 +103,18 @@ func (lm *LevelMetadata) Annotation(annotator Annotator) interface{} { return v } +// InvalidateAnnotation clears any cached annotations defined by Annotator. The +// Annotator is used as the key for pre-calculated values, so equal Annotators +// must be used to clear the appropriate cached annotation. InvalidateAnnotation +// must not be called concurrently, and in practice this is achieved by +// requiring callers to hold DB.mu. +func (lm *LevelMetadata) InvalidateAnnotation(annotator Annotator) { + if lm.Empty() { + return + } + lm.tree.root.invalidateAnnotation(annotator) +} + // LevelFile holds a file's metadata along with its position // within a level of the LSM. type LevelFile struct { diff --git a/internal/manifest/version.go b/internal/manifest/version.go index e57aae5a99..c1ecb6183e 100644 --- a/internal/manifest/version.go +++ b/internal/manifest/version.go @@ -141,10 +141,23 @@ type FileMetadata struct { IsIntraL0Compacting bool // True if the file is actively being compacted. Protected by DB.mu. Compacting bool - // True if user asked us to compact this file. This flag is only set and - // respected by RocksDB but exists here to preserve its value in the - // MANIFEST. - markedForCompaction bool + // True if compaction of this file has been explicitly requested. + // Previously, RocksDB and earlier versions of Pebble allowed this + // flag to be set by a user table property collector. Some earlier + // versions of Pebble respected this flag, while other more recent + // versions ignored this flag. + // + // More recently this flag has been repurposed to facilitate the + // compaction of 'atomic compaction units'. Files marked for + // compaction are compacted in a rewrite compaction at the lowest + // possible compaction priority. + // + // NB: A count of files marked for compaction is maintained on + // Version, and compaction picking reads cached annotations + // determined by this field. + // + // Protected by DB.mu. + MarkedForCompaction bool // HasPointKeys and HasRangeKeys track whether the table contains point and // range keys, respectively. HasPointKeys bool @@ -523,7 +536,7 @@ func NewVersion( // initial B-Tree, we swap out the btreeCmp for the correct one. // TODO(jackson): Adjust or remove the tests and remove this. v.Levels[l].tree, _ = makeBTree(btreeCmpSpecificOrder(files[l]), files[l]) - + v.Levels[l].level = l if l == 0 { v.Levels[l].tree.cmp = btreeCmpSeqNum } else { @@ -590,6 +603,14 @@ type Version struct { // removed. Will be called with list.mu held. Deleted func(obsolete []*FileMetadata) + // Stats holds aggregated stats about the version maintained from + // version to version. + Stats struct { + // MarkedForCompaction records the count of files marked for + // compaction within the version. + MarkedForCompaction int + } + // The list the version is linked into. list *VersionList diff --git a/internal/manifest/version_edit.go b/internal/manifest/version_edit.go index 6829381d68..f1d8a7ec46 100644 --- a/internal/manifest/version_edit.go +++ b/internal/manifest/version_edit.go @@ -257,24 +257,24 @@ func (v *VersionEdit) Decode(r io.Reader) error { v.NewFiles = append(v.NewFiles, NewFileEntry{ Level: level, Meta: &FileMetadata{ - FileNum: fileNum, - Size: size, - CreationTime: int64(creationTime), - SmallestPointKey: smallestPointKey, - LargestPointKey: largestPointKey, - HasPointKeys: true, - SmallestSeqNum: smallestSeqNum, - LargestSeqNum: largestSeqNum, - markedForCompaction: markedForCompaction, + FileNum: fileNum, + Size: size, + CreationTime: int64(creationTime), + SmallestPointKey: smallestPointKey, + LargestPointKey: largestPointKey, + HasPointKeys: true, + SmallestSeqNum: smallestSeqNum, + LargestSeqNum: largestSeqNum, // TODO(travers): For now the smallest and largest keys are pinned to // the smallest and largest point keys, as these are the only types of // keys supported in the manifest. This will need to change when the // manifest is updated to support range keys, which will most likely // leverage a bitset to infer which key types (points or ranges) are // used for the overall smallest and largest keys. - Smallest: smallestPointKey, - Largest: largestPointKey, - boundsSet: true, + Smallest: smallestPointKey, + Largest: largestPointKey, + boundsSet: true, + MarkedForCompaction: markedForCompaction, }, }) @@ -329,7 +329,7 @@ func (v *VersionEdit) Encode(w io.Writer) error { } for _, x := range v.NewFiles { var customFields bool - if x.Meta.markedForCompaction || x.Meta.CreationTime != 0 { + if x.Meta.MarkedForCompaction || x.Meta.CreationTime != 0 { customFields = true e.writeUvarint(tagNewFile4) } else { @@ -349,7 +349,7 @@ func (v *VersionEdit) Encode(w io.Writer) error { n := binary.PutUvarint(buf[:], uint64(x.Meta.CreationTime)) e.writeBytes(buf[:n]) } - if x.Meta.markedForCompaction { + if x.Meta.MarkedForCompaction { e.writeUvarint(customTagNeedsCompaction) e.writeBytes([]byte{1}) } @@ -453,6 +453,10 @@ type BulkVersionEdit struct { // uses AddedByFileNum to correctly populate the BulkVersionEdit's Deleted // field with non-nil *FileMetadata. AddedByFileNum map[base.FileNum]*FileMetadata + + // MarkedForCompactionCountDiff holds the aggregated count of files + // marked for compaction added or removed. + MarkedForCompactionCountDiff int } // Accumulate adds the file addition and deletions in the specified version @@ -475,6 +479,9 @@ func (b *BulkVersionEdit) Accumulate(ve *VersionEdit) error { return base.CorruptionErrorf("pebble: file deleted L%d.%s before it was inserted", df.Level, df.FileNum) } } + if m.MarkedForCompaction { + b.MarkedForCompactionCountDiff-- + } dmap[df.FileNum] = m } @@ -490,6 +497,9 @@ func (b *BulkVersionEdit) Accumulate(ve *VersionEdit) error { if b.AddedByFileNum != nil { b.AddedByFileNum[nf.Meta.FileNum] = nf.Meta } + if nf.Meta.MarkedForCompaction { + b.MarkedForCompactionCountDiff++ + } } return nil } @@ -525,6 +535,16 @@ func (b *BulkVersionEdit) Apply( } v := new(Version) + + // Adjust the count of files marked for compaction. + if curr != nil { + v.Stats.MarkedForCompaction = curr.Stats.MarkedForCompaction + } + v.Stats.MarkedForCompaction += b.MarkedForCompactionCountDiff + if v.Stats.MarkedForCompaction < 0 { + return nil, nil, base.CorruptionErrorf("pebble: version marked for compaction count negative") + } + for level := range v.Levels { if curr == nil || curr.Levels[level].tree.root == nil { v.Levels[level] = makeLevelMetadata(cmp, level, nil /* files */) diff --git a/internal/manifest/version_edit_test.go b/internal/manifest/version_edit_test.go index b968dfd685..37cddf5627 100644 --- a/internal/manifest/version_edit_test.go +++ b/internal/manifest/version_edit_test.go @@ -57,7 +57,7 @@ func TestVersionEditRoundTrip(t *testing.T) { CreationTime: 806040, SmallestSeqNum: 3, LargestSeqNum: 5, - markedForCompaction: true, + MarkedForCompaction: true, }).ExtendPointKeyBounds( cmp, base.DecodeInternalKey([]byte("A\x00\x01\x02\x03\x04\x05\x06\x07")), diff --git a/metrics.go b/metrics.go index e4319ffd92..97153397ed 100644 --- a/metrics.go +++ b/metrics.go @@ -133,6 +133,7 @@ type Metrics struct { ElisionOnlyCount int64 MoveCount int64 ReadCount int64 + RewriteCount int64 // An estimate of the number of bytes that need to be compacted for the LSM // to reach a stable state. EstimatedDebt uint64 @@ -142,6 +143,10 @@ type Metrics struct { InProgressBytes int64 // Number of compactions that are in-progress. NumInProgress int64 + // MarkedFiles is a count of files that are marked for + // compaction. Such files are compacted in a rewrite compaction + // when no other compactions are picked. + MarkedFiles int } Flush struct { @@ -375,12 +380,13 @@ func (m *Metrics) SafeFormat(w redact.SafePrinter, _ rune) { humanize.IEC.Int64(m.Compact.InProgressBytes), redact.Safe(m.Compact.NumInProgress), redact.SafeString("")) - w.Printf(" ctype %9d %7d %7d %7d %7d (default, delete, elision, move, read)\n", + w.Printf(" ctype %9d %7d %7d %7d %7d %7d (default, delete, elision, move, read, rewrite)\n", redact.Safe(m.Compact.DefaultCount), redact.Safe(m.Compact.DeleteOnlyCount), redact.Safe(m.Compact.ElisionOnlyCount), redact.Safe(m.Compact.MoveCount), - redact.Safe(m.Compact.ReadCount)) + redact.Safe(m.Compact.ReadCount), + redact.Safe(m.Compact.RewriteCount)) w.Printf(" memtbl %9d %7s\n", redact.Safe(m.MemTable.Count), humanize.IEC.Uint64(m.MemTable.Size)) diff --git a/metrics_test.go b/metrics_test.go index 6874588d4b..c571b88fd8 100644 --- a/metrics_test.go +++ b/metrics_test.go @@ -27,6 +27,7 @@ func TestMetricsFormat(t *testing.T) { m.Compact.ElisionOnlyCount = 29 m.Compact.MoveCount = 30 m.Compact.ReadCount = 31 + m.Compact.RewriteCount = 32 m.Compact.EstimatedDebt = 6 m.Compact.InProgressBytes = 7 m.Compact.NumInProgress = 2 @@ -84,7 +85,7 @@ __level_____count____size___score______in__ingest(sz_cnt)____move(sz_cnt)___writ total 2807 2.7 K - 2.8 K 2.8 K 2.9 K 2.8 K 2.9 K 8.4 K 5.7 K 2.8 K 28 3.0 flush 8 compact 5 6 B 7 B 2 (size == estimated-debt, score = in-progress-bytes, in = num-in-progress) - ctype 27 28 29 30 31 (default, delete, elision, move, read) + ctype 27 28 29 30 31 32 (default, delete, elision, move, read, rewrite) memtbl 12 11 B zmemtbl 14 13 B ztbl 16 15 B @@ -227,7 +228,7 @@ __level_____count____size___score______in__ingest(sz_cnt)____move(sz_cnt)___writ total 0 0 B - 0 B 0 B 0 0 B 0 0 B 0 0 B 0 0.0 flush 0 compact 0 0 B 0 B 0 (size == estimated-debt, score = in-progress-bytes, in = num-in-progress) - ctype 0 0 0 0 0 (default, delete, elision, move, read) + ctype 0 0 0 0 0 0 (default, delete, elision, move, read, rewrite) memtbl 0 0 B zmemtbl 0 0 B ztbl 0 0 B diff --git a/open.go b/open.go index 5f98df058f..036df5ba61 100644 --- a/open.go +++ b/open.go @@ -237,7 +237,6 @@ func Open(dirname string, opts *Options) (db *DB, _ error) { if err := d.mu.versions.currentVersion().CheckConsistency(dirname, opts.FS); err != nil { return nil, err } - } // If the Options specify a format major version higher than the @@ -370,7 +369,7 @@ func Open(dirname string, opts *Options) (db *DB, _ error) { // crash before the manifest is synced could leave two WALs with // unclean tails. d.mu.versions.logLock() - if err := d.mu.versions.logAndApply(jobID, &ve, newFileMetrics(ve.NewFiles), func() []compactionInfo { + if err := d.mu.versions.logAndApply(jobID, &ve, newFileMetrics(ve.NewFiles), false /* forceRotation */, func() []compactionInfo { return nil }); err != nil { return nil, err diff --git a/open_test.go b/open_test.go index 68e34ec235..2bc0d819d6 100644 --- a/open_test.go +++ b/open_test.go @@ -101,7 +101,7 @@ func TestNewDBFilenames(t *testing.T) { "LOCK", "MANIFEST-000001", "OPTIONS-000003", - "marker.format-version.000005.006", + "marker.format-version.000007.008", "marker.manifest.000001.MANIFEST-000001", }, } diff --git a/testdata/checkpoint b/testdata/checkpoint index f35759ef82..61ae796975 100644 --- a/testdata/checkpoint +++ b/testdata/checkpoint @@ -34,6 +34,12 @@ sync: db create: db/marker.format-version.000005.006 close: db/marker.format-version.000005.006 sync: db +create: db/marker.format-version.000006.007 +close: db/marker.format-version.000006.007 +sync: db +create: db/marker.format-version.000007.008 +close: db/marker.format-version.000007.008 +sync: db sync: db/MANIFEST-000001 create: db/000002.log sync: db @@ -99,9 +105,9 @@ close: open-dir: checkpoints/checkpoint1 link: db/OPTIONS-000003 -> checkpoints/checkpoint1/OPTIONS-000003 open-dir: checkpoints/checkpoint1 -create: checkpoints/checkpoint1/marker.format-version.000001.006 -sync: checkpoints/checkpoint1/marker.format-version.000001.006 -close: checkpoints/checkpoint1/marker.format-version.000001.006 +create: checkpoints/checkpoint1/marker.format-version.000001.008 +sync: checkpoints/checkpoint1/marker.format-version.000001.008 +close: checkpoints/checkpoint1/marker.format-version.000001.008 sync: checkpoints/checkpoint1 close: checkpoints/checkpoint1 create: checkpoints/checkpoint1/MANIFEST-000001 @@ -156,7 +162,7 @@ CURRENT LOCK MANIFEST-000001 OPTIONS-000003 -marker.format-version.000005.006 +marker.format-version.000007.008 marker.manifest.000001.MANIFEST-000001 list checkpoints/checkpoint1 @@ -166,7 +172,7 @@ list checkpoints/checkpoint1 000007.sst MANIFEST-000001 OPTIONS-000003 -marker.format-version.000001.006 +marker.format-version.000001.008 marker.manifest.000001.MANIFEST-000001 open checkpoints/checkpoint1 readonly diff --git a/testdata/compaction_picker_L0 b/testdata/compaction_picker_L0 index ece043f8a2..a14f7ab34d 100644 --- a/testdata/compaction_picker_L0 +++ b/testdata/compaction_picker_L0 @@ -389,3 +389,25 @@ pick-auto ---- L0 -> L6 L0: 000051,000053 + +# At low priority, find and compact marked-for-compaction files. + +define +L0 + 000049:t.SET.22-t.SET.22 +L6 + 000045:f.SET.0-x.SET.0 +---- +0.0: + 000049:[t#22,SET-t#22,SET] +6: + 000045:[f#0,SET-x#0,SET] + +mark-for-compaction file=000049 +---- +marked L0.000049 + +pick-auto l0_compaction_threshold=1000 +---- +L0 -> L0 +L0: 000049 diff --git a/testdata/event_listener b/testdata/event_listener index 67f3369b24..03dd585649 100644 --- a/testdata/event_listener +++ b/testdata/event_listener @@ -42,6 +42,14 @@ create: db/marker.format-version.000005.006 close: db/marker.format-version.000005.006 sync: db upgraded to format version: 006 +create: db/marker.format-version.000006.007 +close: db/marker.format-version.000006.007 +sync: db +upgraded to format version: 007 +create: db/marker.format-version.000007.008 +close: db/marker.format-version.000007.008 +sync: db +upgraded to format version: 008 create: db/MANIFEST-000003 close: db/MANIFEST-000001 sync: db/MANIFEST-000003 @@ -65,10 +73,10 @@ sync: wal/000002.log close: wal/000002.log create: wal/000005.log sync: wal -[JOB 2] WAL created 000005 -[JOB 3] flushing 1 memtable to L0 +[JOB 3] WAL created 000005 +[JOB 4] flushing 1 memtable to L0 create: db/000006.sst -[JOB 3] flushing: sstable created 000006 +[JOB 4] flushing: sstable created 000006 sync: db/000006.sst close: db/000006.sst sync: db @@ -78,9 +86,9 @@ sync: db/MANIFEST-000007 create: db/marker.manifest.000003.MANIFEST-000007 close: db/marker.manifest.000003.MANIFEST-000007 sync: db -[JOB 3] MANIFEST created 000007 -[JOB 3] flushed 1 memtable to L0 [000006] (770 B), in 1.0s (2.0s total), output rate 770 B/s -[JOB 3] MANIFEST deleted 000001 +[JOB 4] MANIFEST created 000007 +[JOB 4] flushed 1 memtable to L0 [000006] (770 B), in 1.0s (2.0s total), output rate 770 B/s +[JOB 4] MANIFEST deleted 000001 compact ---- @@ -89,10 +97,10 @@ sync: wal/000005.log close: wal/000005.log reuseForWrite: wal/000002.log -> wal/000008.log sync: wal -[JOB 4] WAL created 000008 (recycled 000002) -[JOB 5] flushing 1 memtable to L0 +[JOB 5] WAL created 000008 (recycled 000002) +[JOB 6] flushing 1 memtable to L0 create: db/000009.sst -[JOB 5] flushing: sstable created 000009 +[JOB 6] flushing: sstable created 000009 sync: db/000009.sst close: db/000009.sst sync: db @@ -102,12 +110,12 @@ sync: db/MANIFEST-000010 create: db/marker.manifest.000004.MANIFEST-000010 close: db/marker.manifest.000004.MANIFEST-000010 sync: db -[JOB 5] MANIFEST created 000010 -[JOB 5] flushed 1 memtable to L0 [000009] (770 B), in 1.0s (2.0s total), output rate 770 B/s -[JOB 5] MANIFEST deleted 000003 -[JOB 6] compacting(default) L0 [000006 000009] (1.5 K) + L6 [] (0 B) +[JOB 6] MANIFEST created 000010 +[JOB 6] flushed 1 memtable to L0 [000009] (770 B), in 1.0s (2.0s total), output rate 770 B/s +[JOB 6] MANIFEST deleted 000003 +[JOB 7] compacting(default) L0 [000006 000009] (1.5 K) + L6 [] (0 B) create: db/000011.sst -[JOB 6] compacting: sstable created 000011 +[JOB 7] compacting: sstable created 000011 sync: db/000011.sst close: db/000011.sst sync: db @@ -117,11 +125,11 @@ sync: db/MANIFEST-000012 create: db/marker.manifest.000005.MANIFEST-000012 close: db/marker.manifest.000005.MANIFEST-000012 sync: db -[JOB 6] MANIFEST created 000012 -[JOB 6] compacted(default) L0 [000006 000009] (1.5 K) + L6 [] (0 B) -> L6 [000011] (770 B), in 1.0s (2.0s total), output rate 770 B/s -[JOB 6] sstable deleted 000006 -[JOB 6] sstable deleted 000009 -[JOB 6] MANIFEST deleted 000007 +[JOB 7] MANIFEST created 000012 +[JOB 7] compacted(default) L0 [000006 000009] (1.5 K) + L6 [] (0 B) -> L6 [000011] (770 B), in 1.0s (2.0s total), output rate 770 B/s +[JOB 7] sstable deleted 000006 +[JOB 7] sstable deleted 000009 +[JOB 7] MANIFEST deleted 000007 disable-file-deletions ---- @@ -133,10 +141,10 @@ sync: wal/000008.log close: wal/000008.log reuseForWrite: wal/000005.log -> wal/000013.log sync: wal -[JOB 7] WAL created 000013 (recycled 000005) -[JOB 8] flushing 1 memtable to L0 +[JOB 8] WAL created 000013 (recycled 000005) +[JOB 9] flushing 1 memtable to L0 create: db/000014.sst -[JOB 8] flushing: sstable created 000014 +[JOB 9] flushing: sstable created 000014 sync: db/000014.sst close: db/000014.sst sync: db @@ -146,17 +154,17 @@ sync: db/MANIFEST-000015 create: db/marker.manifest.000006.MANIFEST-000015 close: db/marker.manifest.000006.MANIFEST-000015 sync: db -[JOB 8] MANIFEST created 000015 -[JOB 8] flushed 1 memtable to L0 [000014] (770 B), in 1.0s (2.0s total), output rate 770 B/s +[JOB 9] MANIFEST created 000015 +[JOB 9] flushed 1 memtable to L0 [000014] (770 B), in 1.0s (2.0s total), output rate 770 B/s enable-file-deletions ---- -[JOB 9] MANIFEST deleted 000010 +[JOB 10] MANIFEST deleted 000010 ingest ---- link: ext/0 -> db/000016.sst -[JOB 10] ingesting: sstable created 000016 +[JOB 11] ingesting: sstable created 000016 sync: db create: db/MANIFEST-000017 close: db/MANIFEST-000015 @@ -164,9 +172,9 @@ sync: db/MANIFEST-000017 create: db/marker.manifest.000007.MANIFEST-000017 close: db/marker.manifest.000007.MANIFEST-000017 sync: db -[JOB 10] MANIFEST created 000017 -[JOB 10] MANIFEST deleted 000012 -[JOB 10] ingested L0:000016 (825 B) +[JOB 11] MANIFEST created 000017 +[JOB 11] MANIFEST deleted 000012 +[JOB 11] ingested L0:000016 (825 B) metrics ---- @@ -182,7 +190,7 @@ __level_____count____size___score______in__ingest(sz_cnt)____move(sz_cnt)___writ total 3 2.3 K - 933 B 825 B 1 0 B 0 3.9 K 4 1.5 K 3 4.3 flush 3 compact 1 2.3 K 0 B 0 (size == estimated-debt, score = in-progress-bytes, in = num-in-progress) - ctype 1 0 0 0 0 (default, delete, elision, move, read) + ctype 1 0 0 0 0 0 (default, delete, elision, move, read, rewrite) memtbl 1 256 K zmemtbl 0 0 B ztbl 0 0 B @@ -209,9 +217,9 @@ close: open-dir: checkpoint link: db/OPTIONS-000004 -> checkpoint/OPTIONS-000004 open-dir: checkpoint -create: checkpoint/marker.format-version.000001.006 -sync: checkpoint/marker.format-version.000001.006 -close: checkpoint/marker.format-version.000001.006 +create: checkpoint/marker.format-version.000001.008 +sync: checkpoint/marker.format-version.000001.008 +close: checkpoint/marker.format-version.000001.008 sync: checkpoint close: checkpoint create: checkpoint/MANIFEST-000017 diff --git a/testdata/ingest b/testdata/ingest index 3018d16950..89fd2133c4 100644 --- a/testdata/ingest +++ b/testdata/ingest @@ -43,7 +43,7 @@ __level_____count____size___score______in__ingest(sz_cnt)____move(sz_cnt)___writ total 1 833 B - 833 B 833 B 1 0 B 0 833 B 0 0 B 1 1.0 flush 0 compact 0 0 B 0 B 0 (size == estimated-debt, score = in-progress-bytes, in = num-in-progress) - ctype 0 0 0 0 0 (default, delete, elision, move, read) + ctype 0 0 0 0 0 0 (default, delete, elision, move, read, rewrite) memtbl 1 256 K zmemtbl 0 0 B ztbl 0 0 B diff --git a/testdata/ingest_load b/testdata/ingest_load index 4b082c3cb3..bd2d91da2c 100644 --- a/testdata/ingest_load +++ b/testdata/ingest_load @@ -89,11 +89,11 @@ a.RANGEDEL.0:b ranges: #0,0-#0,0 # Loading tables at an unsupported table format results in an error. -# Write a table at version 6 (Pebble,v2) into a DB at version 5 (Pebble,v1). -load writer-version=6 db-version=5 +# Write a table at version 7 (Pebble,v2) into a DB at version 6 (Pebble,v1). +load writer-version=8 db-version=7 a.SET.1: ---- -pebble: table with format (Pebble,v2) unsupported at DB format major version 5, (Pebble,v1) +pebble: table with format (Pebble,v2) unsupported at DB format major version 7, (Pebble,v1) # Tables with range keys only. diff --git a/testdata/marked_for_compaction b/testdata/marked_for_compaction new file mode 100644 index 0000000000..5464a24aa8 --- /dev/null +++ b/testdata/marked_for_compaction @@ -0,0 +1,28 @@ +define +L0 + c.SET.11:foo +L1 + c.SET.0:foo + d.SET.0:foo +---- +0.0: + 000004:[c#11,SET-c#11,SET] points:[c#11,SET-c#11,SET] +1: + 000005:[c#0,SET-d#0,SET] points:[c#0,SET-d#0,SET] + +mark-for-compaction file=000005 +---- +marked L1.000005 + +mark-for-compaction file=000004 +---- +marked L0.000004 + +maybe-compact +---- +[JOB 100] compacted(rewrite) L1 [000005] (779 B) + L1 [] (0 B) -> L1 [000006] (779 B), in 1.0s (2.0s total), output rate 779 B/s +[JOB 100] compacted(rewrite) L0 [000004] (773 B) + L0 [] (0 B) -> L0 [000007] (773 B), in 1.0s (2.0s total), output rate 773 B/s +0.0: + 000007:[c#11,SET-c#11,SET] points:[c#11,SET-c#11,SET] +1: + 000006:[c#0,SET-d#0,SET] points:[c#0,SET-d#0,SET] diff --git a/testdata/metrics b/testdata/metrics index b4f9891b8d..adcf53ee55 100644 --- a/testdata/metrics +++ b/testdata/metrics @@ -29,7 +29,7 @@ __level_____count____size___score______in__ingest(sz_cnt)____move(sz_cnt)___writ total 1 771 B - 56 B 0 B 0 0 B 0 827 B 1 0 B 1 14.8 flush 1 compact 0 0 B 0 B 0 (size == estimated-debt, score = in-progress-bytes, in = num-in-progress) - ctype 0 0 0 0 0 (default, delete, elision, move, read) + ctype 0 0 0 0 0 0 (default, delete, elision, move, read, rewrite) memtbl 1 256 K zmemtbl 1 256 K ztbl 0 0 B @@ -77,7 +77,7 @@ __level_____count____size___score______in__ingest(sz_cnt)____move(sz_cnt)___writ total 1 778 B - 84 B 0 B 0 0 B 0 2.3 K 3 1.5 K 1 28.6 flush 2 compact 1 0 B 0 B 0 (size == estimated-debt, score = in-progress-bytes, in = num-in-progress) - ctype 1 0 0 0 0 (default, delete, elision, move, read) + ctype 1 0 0 0 0 0 (default, delete, elision, move, read, rewrite) memtbl 1 256 K zmemtbl 2 512 K ztbl 2 1.5 K @@ -110,7 +110,7 @@ __level_____count____size___score______in__ingest(sz_cnt)____move(sz_cnt)___writ total 1 778 B - 84 B 0 B 0 0 B 0 2.3 K 3 1.5 K 1 28.6 flush 2 compact 1 0 B 0 B 0 (size == estimated-debt, score = in-progress-bytes, in = num-in-progress) - ctype 1 0 0 0 0 (default, delete, elision, move, read) + ctype 1 0 0 0 0 0 (default, delete, elision, move, read, rewrite) memtbl 1 256 K zmemtbl 1 256 K ztbl 2 1.5 K @@ -140,7 +140,7 @@ __level_____count____size___score______in__ingest(sz_cnt)____move(sz_cnt)___writ total 1 778 B - 84 B 0 B 0 0 B 0 2.3 K 3 1.5 K 1 28.6 flush 2 compact 1 0 B 0 B 0 (size == estimated-debt, score = in-progress-bytes, in = num-in-progress) - ctype 1 0 0 0 0 (default, delete, elision, move, read) + ctype 1 0 0 0 0 0 (default, delete, elision, move, read, rewrite) memtbl 1 256 K zmemtbl 1 256 K ztbl 1 771 B @@ -173,7 +173,7 @@ __level_____count____size___score______in__ingest(sz_cnt)____move(sz_cnt)___writ total 1 778 B - 84 B 0 B 0 0 B 0 2.3 K 3 1.5 K 1 28.6 flush 2 compact 1 0 B 0 B 0 (size == estimated-debt, score = in-progress-bytes, in = num-in-progress) - ctype 1 0 0 0 0 (default, delete, elision, move, read) + ctype 1 0 0 0 0 0 (default, delete, elision, move, read, rewrite) memtbl 1 256 K zmemtbl 0 0 B ztbl 0 0 B diff --git a/testdata/rangekeys b/testdata/rangekeys index 98eb0bce45..0794e00c7c 100644 --- a/testdata/rangekeys +++ b/testdata/rangekeys @@ -508,24 +508,24 @@ x: valid (., [x-z) @5=boop) x: valid (., [x-z) @5=boop) # Applying range keys to a DB running with a version that doesn't support them -# results in an error. Range keys were added in version 6. -reset format-major-version=5 +# results in an error. Range keys were added in version 7. +reset format-major-version=6 ---- batch range-key-set a z @5 boop ---- -pebble: range keys require at least format major version 6 (current: 5) +pebble: range keys require at least format major version 8 (current: 6) # Constructing iterator over range keys on a DB that doesn't support them # results in an error. -reset format-major-version=5 +reset format-major-version=6 ---- combined-iter ---- -pebble: range keys require at least format major version 6 (current: 5) +pebble: range keys require at least format major version 8 (current: 6) # Test iterator bounds provided via IterOptions. diff --git a/testdata/split_user_key_migration b/testdata/split_user_key_migration new file mode 100644 index 0000000000..78a3ccead0 --- /dev/null +++ b/testdata/split_user_key_migration @@ -0,0 +1,148 @@ +define +L1 +d.SET.110:d e.SET.140:e +---- +1: + 000004:[d#110,SET-e#140,SET] points:[d#110,SET-e#140,SET] + +reopen +---- +OK + +# The current public Pebble interface offers no way of constructing a multi-file +# atomic compaction unit, so use the force-ingest command to force an ingestion +# into L1. + +build ef +set e e +set f f +---- + +force-ingest paths=(ef) level=1 +---- +1: + 000004:[d#110,SET-e#140,SET] points:[d#110,SET-e#140,SET] + 000008:[e#1,SET-f#1,SET] points:[e#1,SET-f#1,SET] + +format-major-version +---- +005 + +marked-file-count +---- +0 files marked for compaction + +ratchet-format-major-version 006 +---- + +format-major-version +---- +006 + +# Upgrading to format major version 006 should've marked files for compaction. + +marked-file-count +---- +2 files marked for compaction + +reopen +---- +OK + +format-major-version +---- +006 + +# Ensure the files previously marked for compaction are still marked for +# compaction. + +marked-file-count +---- +2 files marked for compaction + +# Ratcheting to 007 should force compaction of any files still marked for +# compaction. + +ratchet-format-major-version 007 +---- +[JOB 100] compacted(rewrite) L1 [000004 000008] (1.6 K) + L1 [] (0 B) -> L1 [000013] (786 B), in 1.0s (2.0s total), output rate 786 B/s + +format-major-version +---- +007 + +lsm +---- +1: + 000013:[d#0,SET-f#0,SET] + +# Reset to a new LSM. + +define +L1 +b.SET.0:b c.SET.5:c +L1 +m.SET.0:m l.SET.5:l +L1 +x.SET.0:x y.SET.5:y +---- +1: + 000004:[b#0,SET-c#5,SET] points:[b#0,SET-c#5,SET] + 000005:[l#5,SET-m#0,SET] points:[l#5,SET-m#0,SET] + 000006:[x#0,SET-y#5,SET] points:[x#0,SET-y#5,SET] + +build ab +set a a +set b b +---- + +build cd +set c c +set d d +---- + +build wx +set w w +set x x +---- + +force-ingest paths=(ab, cd, wx) level=1 +---- +1: + 000007:[a#1,SET-b#1,SET] points:[a#1,SET-b#1,SET] + 000004:[b#0,SET-c#5,SET] points:[b#0,SET-c#5,SET] + 000008:[c#2,SET-d#2,SET] points:[c#2,SET-d#2,SET] + 000005:[l#5,SET-m#0,SET] points:[l#5,SET-m#0,SET] + 000009:[w#3,SET-x#3,SET] points:[w#3,SET-x#3,SET] + 000006:[x#0,SET-y#5,SET] points:[x#0,SET-y#5,SET] + +format-major-version +---- +005 + +ratchet-format-major-version 006 +---- + +format-major-version +---- +006 + +marked-file-count +---- +5 files marked for compaction + +ratchet-format-major-version 007 +---- +[JOB 100] compacted(rewrite) L1 [000007 000004 000008] (2.4 K) + L1 [] (0 B) -> L1 [000011] (794 B), in 1.0s (2.0s total), output rate 794 B/s +[JOB 100] compacted(rewrite) L1 [000009 000006] (1.6 K) + L1 [] (0 B) -> L1 [000012] (786 B), in 1.0s (2.0s total), output rate 786 B/s + +lsm +---- +1: + 000011:[a#0,SET-d#0,SET] + 000005:[l#5,SET-m#0,SET] + 000012:[w#0,SET-y#0,SET] + +format-major-version +---- +007 diff --git a/tool/testdata/db_lsm b/tool/testdata/db_lsm index ae4e3cf758..d60f1d8c0d 100644 --- a/tool/testdata/db_lsm +++ b/tool/testdata/db_lsm @@ -22,7 +22,7 @@ __level_____count____size___score______in__ingest(sz_cnt)____move(sz_cnt)___writ total 1 986 B - 0 B 0 B 0 0 B 0 0 B 0 0 B 0 0.0 flush 0 compact 0 0 B 0 B 0 (size == estimated-debt, score = in-progress-bytes, in = num-in-progress) - ctype 0 0 0 0 0 (default, delete, elision, move, read) + ctype 0 0 0 0 0 0 (default, delete, elision, move, read, rewrite) memtbl 1 256 K zmemtbl 0 0 B ztbl 0 0 B diff --git a/version_set.go b/version_set.go index 18b655e28f..bd5091ef9e 100644 --- a/version_set.go +++ b/version_set.go @@ -346,6 +346,7 @@ func (vs *versionSet) logAndApply( jobID int, ve *versionEdit, metrics map[int]*LevelMetrics, + forceRotation bool, inProgressCompactions func() []compactionInfo, ) error { if !vs.writing { @@ -394,7 +395,7 @@ func (vs *versionSet) logAndApply( // is too large. var newManifestFileNum FileNum var prevManifestFileSize uint64 - if vs.manifest == nil || vs.manifest.Size() >= vs.opts.MaxManifestFileSize { + if forceRotation || vs.manifest == nil || vs.manifest.Size() >= vs.opts.MaxManifestFileSize { newManifestFileNum = vs.getNextFileNum() prevManifestFileSize = uint64(vs.manifest.Size()) } @@ -550,6 +551,10 @@ func (vs *versionSet) incrementCompactions(kind compactionKind) { case compactionKindRead: vs.metrics.Compact.Count++ vs.metrics.Compact.ReadCount++ + + case compactionKindRewrite: + vs.metrics.Compact.Count++ + vs.metrics.Compact.RewriteCount++ } }