diff --git a/compaction.go b/compaction.go index 9d21edc2f7..c50bebf6cf 100644 --- a/compaction.go +++ b/compaction.go @@ -285,6 +285,7 @@ const ( compactionKindDeleteOnly compactionKindElisionOnly compactionKindRead + compactionKindRewrite ) func (k compactionKind) String() string { @@ -301,6 +302,8 @@ func (k compactionKind) String() string { return "elision-only" case compactionKindRead: return "read" + case compactionKindRewrite: + return "rewrite" } return "?" } @@ -320,9 +323,9 @@ type compaction struct { // startLevel is the level that is being compacted. Inputs from startLevel // and outputLevel will be merged to produce a set of outputLevel files. startLevel *compactionLevel - // outputLevel is the level that files are being produced in. outputLevel is - // equal to startLevel+1 except when startLevel is 0 in which case it is - // equal to compactionPicker.baseLevel(). + // outputLevel is the level that files are being produced in. For default + // compactions, outputLevel is equal to startLevel+1 except when startLevel + // is 0 in which case it is equal to compactionPicker.baseLevel(). outputLevel *compactionLevel inputs []compactionLevel @@ -1433,7 +1436,7 @@ func (d *DB) flush1() error { } d.mu.versions.logLock() - err = d.mu.versions.logAndApply(jobID, ve, c.metrics, + err = d.mu.versions.logAndApply(jobID, ve, c.metrics, false, /* forceRotation */ func() []compactionInfo { return d.getInProgressCompactionInfoLocked(c) }) if err != nil { // TODO(peter): untested. @@ -1857,7 +1860,7 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) { info.Duration = d.timeNow().Sub(startTime) if err == nil { d.mu.versions.logLock() - err = d.mu.versions.logAndApply(jobID, ve, c.metrics, func() []compactionInfo { + err = d.mu.versions.logAndApply(jobID, ve, c.metrics, false /* forceRotation */, func() []compactionInfo { return d.getInProgressCompactionInfoLocked(c) }) if err != nil { diff --git a/compaction_picker.go b/compaction_picker.go index ed518fdb82..761f57ce7c 100644 --- a/compaction_picker.go +++ b/compaction_picker.go @@ -37,6 +37,7 @@ type compactionPicker interface { pickAuto(env compactionEnv) (pc *pickedCompaction) pickManual(env compactionEnv, manual *manualCompaction) (c *pickedCompaction, retryLater bool) pickElisionOnlyCompaction(env compactionEnv) (pc *pickedCompaction) + pickRewriteCompaction(env compactionEnv) (pc *pickedCompaction) pickReadTriggeredCompaction(env compactionEnv) (pc *pickedCompaction) forceBaseLevel1() } @@ -109,9 +110,9 @@ type pickedCompaction struct { // startLevel is the level that is being compacted. Inputs from startLevel // and outputLevel will be merged to produce a set of outputLevel files. startLevel *compactionLevel - // outputLevel is the level that files are being produced in. outputLevel is - // equal to startLevel+1 except when startLevel is 0 in which case it is - // equal to compactionPicker.baseLevel(). + // outputLevel is the level that files are being produced in. In default + // compactions, outputLevel is equal to startLevel+1 except when startLevel + // is 0 in which case it is equal to compactionPicker.baseLevel(). outputLevel *compactionLevel // adjustedOutputLevel is the output level used for the purpose of // determining the target output file size, overlap bytes, and expanded @@ -1066,6 +1067,25 @@ func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompact *env.readCompactionEnv.rescheduleReadCompaction = true } + // At the lowest possible compaction-picking priority, look for files marked + // for compaction. Pebble will mark files for compaction if they have atomic + // compaction units that span multiple files. While current Pebble code does + // not construct such sstables, RocksDB and earlier versions of Pebble may + // have created them. These split user keys form sets of files that must be + // compacted together for correctness (referred to as "atomic compaction + // units" within the code). Rewrite them in-place. + // + // It's also possible that a file may have been marked for compaction by + // even earlier versions of Pebble code, since FileMetadata's + // MarkedForCompaction field is persisted in the manifest. That's okay. We + // previously would've ignored the designation, whereas now we'll re-compact + // the file in place. + if p.vers.Stats.MarkedForCompaction > 0 { + if pc := p.pickRewriteCompaction(env); pc != nil { + return pc + } + } + return nil } @@ -1129,6 +1149,46 @@ func (a elisionOnlyAnnotator) Merge(v interface{}, accum interface{}) interface{ return accumV } +// markedForCompactionAnnotator implements the manifest.Annotator interface, +// annotating B-Tree nodes with the *fileMetadata of a file that is marked for +// compaction within the subtree. If multiple files meet the criteria, it +// chooses whichever file has the lowest LargestSeqNum. +type markedForCompactionAnnotator struct{} + +var _ manifest.Annotator = markedForCompactionAnnotator{} + +func (a markedForCompactionAnnotator) Zero(interface{}) interface{} { + return nil +} + +func (a markedForCompactionAnnotator) Accumulate( + f *fileMetadata, dst interface{}, +) (interface{}, bool) { + if !f.MarkedForCompaction { + // Not marked for compaction; return dst. + return dst, true + } + return markedMergeHelper(f, dst) +} + +func (a markedForCompactionAnnotator) Merge(v interface{}, accum interface{}) interface{} { + if v == nil { + return accum + } + accum, _ = markedMergeHelper(v.(*fileMetadata), accum) + return accum +} + +// REQUIRES: f is non-nil, and f.MarkedForCompaction=true. +func markedMergeHelper(f *fileMetadata, dst interface{}) (interface{}, bool) { + if dst == nil { + return f, true + } else if dstV := dst.(*fileMetadata); dstV.LargestSeqNum > f.LargestSeqNum { + return f, true + } + return dst, true +} + // pickElisionOnlyCompaction looks for compactions of sstables in the // bottommost level containing obsolete records that may now be dropped. func (p *compactionPickerByScore) pickElisionOnlyCompaction( @@ -1164,6 +1224,66 @@ func (p *compactionPickerByScore) pickElisionOnlyCompaction( return nil } +// pickRewriteCompaction attempts to construct a compaction that +// rewrites a file marked for compaction. pickRewriteCompaction will +// pull in adjacent files in the file's atomic compaction unit if +// necessary. A rewrite compaction outputs files to the same level as +// the input level. +func (p *compactionPickerByScore) pickRewriteCompaction(env compactionEnv) (pc *pickedCompaction) { + for l := numLevels - 1; l >= 0; l-- { + v := p.vers.Levels[l].Annotation(markedForCompactionAnnotator{}) + if v == nil { + // Try the next level. + continue + } + candidate := v.(*fileMetadata) + if candidate.Compacting { + // Try the next level. + continue + } + lf := p.vers.Levels[l].Find(p.opts.Comparer.Compare, candidate) + if lf == nil { + panic(fmt.Sprintf("file %s not found in level %d as expected", candidate.FileNum, numLevels-1)) + } + + inputs := lf.Slice() + // L0 files generated by a flush have never been split such that + // adjacent files can contain the same user key. So we do not need to + // rewrite an atomic compaction unit for L0. Note that there is nothing + // preventing two different flushes from producing files that are + // non-overlapping from an InternalKey perspective, but span the same + // user key. However, such files cannot be in the same L0 sublevel, + // since each sublevel requires non-overlapping user keys (unlike other + // levels). + if l > 0 { + // Find this file's atomic compaction unit. This is only relevant + // for levels L1+. + var isCompacting bool + inputs, isCompacting = expandToAtomicUnit( + p.opts.Comparer.Compare, + inputs, + false, /* disableIsCompacting */ + ) + if isCompacting { + // Try the next level. + continue + } + } + + pc = newPickedCompaction(p.opts, p.vers, l, l, p.baseLevel) + pc.outputLevel.level = l + pc.kind = compactionKindRewrite + pc.startLevel.files = inputs + pc.smallest, pc.largest = manifest.KeyRange(pc.cmp, pc.startLevel.files.Iter()) + + // Fail-safe to protect against compacting the same sstable concurrently. + if !inputRangeAlreadyCompacting(env, pc) { + return pc + } + } + return nil +} + // pickAutoLPositive picks an automatic compaction for the candidate // file in a positive-numbered level. This function must not be used for // L0. diff --git a/compaction_picker_test.go b/compaction_picker_test.go index eba520a78a..297f7bcdde 100644 --- a/compaction_picker_test.go +++ b/compaction_picker_test.go @@ -536,6 +536,22 @@ func TestCompactionPickerL0(t *testing.T) { return "nil" } return result.String() + case "mark-for-compaction": + var fileNum uint64 + td.ScanArgs(t, "file", &fileNum) + for l, lm := range picker.vers.Levels { + iter := lm.Iter() + for f := iter.First(); f != nil; f = iter.Next() { + if f.FileNum != base.FileNum(fileNum) { + continue + } + f.MarkedForCompaction = true + picker.vers.Stats.MarkedForCompaction++ + picker.vers.Levels[l].InvalidateAnnotation(markedForCompactionAnnotator{}) + return fmt.Sprintf("marked L%d.%s", l, f.FileNum) + } + } + return "not-found" case "max-output-file-size": if pc == nil { return "no compaction" diff --git a/compaction_test.go b/compaction_test.go index 7225efae99..e9ee8f3963 100644 --- a/compaction_test.go +++ b/compaction_test.go @@ -96,6 +96,12 @@ func (p *compactionPickerForTesting) pickElisionOnlyCompaction( return nil } +func (p *compactionPickerForTesting) pickRewriteCompaction( + env compactionEnv, +) (pc *pickedCompaction) { + return nil +} + func (p *compactionPickerForTesting) pickManual( env compactionEnv, manual *manualCompaction, ) (pc *pickedCompaction, retryLater bool) { @@ -3348,3 +3354,104 @@ func Test_calculateInuseKeyRanges(t *testing.T) { }) } } + +func TestMarkedForCompaction(t *testing.T) { + var mem vfs.FS = vfs.NewMem() + var d *DB + defer func() { + if d != nil { + require.NoError(t, d.Close()) + } + }() + + var buf bytes.Buffer + opts := &Options{ + FS: mem, + DebugCheck: DebugCheckLevels, + DisableAutomaticCompactions: true, + FormatMajorVersion: FormatNewest, + EventListener: EventListener{ + CompactionEnd: func(info CompactionInfo) { + info.JobID = 100 // Fix to avoid nondeterminism. + fmt.Fprintln(&buf, info) + }, + }, + } + + reset := func() { + if d != nil { + require.NoError(t, d.Close()) + } + mem = vfs.NewMem() + require.NoError(t, mem.MkdirAll("ext", 0755)) + + var err error + d, err = Open("", opts) + require.NoError(t, err) + } + datadriven.RunTest(t, "testdata/marked_for_compaction", func(td *datadriven.TestData) string { + switch td.Cmd { + case "reset": + reset() + return "" + + case "define": + if d != nil { + if err := d.Close(); err != nil { + return err.Error() + } + } + var err error + if d, err = runDBDefineCmd(td, opts); err != nil { + return err.Error() + } + d.mu.Lock() + defer d.mu.Unlock() + t := time.Now() + d.timeNow = func() time.Time { + t = t.Add(time.Second) + return t + } + s := d.mu.versions.currentVersion().DebugString(base.DefaultFormatter) + return s + + case "mark-for-compaction": + d.mu.Lock() + defer d.mu.Unlock() + vers := d.mu.versions.currentVersion() + var fileNum uint64 + td.ScanArgs(t, "file", &fileNum) + for l, lm := range vers.Levels { + iter := lm.Iter() + for f := iter.First(); f != nil; f = iter.Next() { + if f.FileNum != base.FileNum(fileNum) { + continue + } + f.MarkedForCompaction = true + vers.Stats.MarkedForCompaction++ + vers.Levels[l].InvalidateAnnotation(markedForCompactionAnnotator{}) + return fmt.Sprintf("marked L%d.%s", l, f.FileNum) + } + } + return "not-found" + + case "maybe-compact": + d.mu.Lock() + defer d.mu.Unlock() + d.opts.DisableAutomaticCompactions = false + d.maybeScheduleCompaction() + for d.mu.compact.compactingCount > 0 { + d.mu.compact.cond.Wait() + } + + fmt.Fprintln(&buf, d.mu.versions.currentVersion().DebugString(base.DefaultFormatter)) + s := strings.TrimSpace(buf.String()) + buf.Reset() + opts.DisableAutomaticCompactions = true + return s + + default: + return fmt.Sprintf("unknown command: %s", td.Cmd) + } + }) +} diff --git a/data_test.go b/data_test.go index b9e1173785..4470ab1faf 100644 --- a/data_test.go +++ b/data_test.go @@ -729,7 +729,7 @@ func runDBDefineCmd(td *datadriven.TestData, opts *Options) (*DB, error) { jobID := d.mu.nextJobID d.mu.nextJobID++ d.mu.versions.logLock() - if err := d.mu.versions.logAndApply(jobID, ve, newFileMetrics(ve.NewFiles), func() []compactionInfo { + if err := d.mu.versions.logAndApply(jobID, ve, newFileMetrics(ve.NewFiles), false, func() []compactionInfo { return nil }); err != nil { return nil, err @@ -822,6 +822,34 @@ func runIngestCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error { return nil } +func runForceIngestCmd(td *datadriven.TestData, d *DB) error { + var paths []string + var level int + for _, arg := range td.CmdArgs { + switch arg.Key { + case "paths": + paths = append(paths, arg.Vals...) + case "level": + var err error + level, err = strconv.Atoi(arg.Vals[0]) + if err != nil { + return err + } + } + } + return d.ingest(paths, func( + tableNewIters, + IterOptions, + Compare, + *version, + int, + map[*compaction]struct{}, + *fileMetadata, + ) (int, error) { + return level, nil + }) +} + func runLSMCmd(td *datadriven.TestData, d *DB) string { d.mu.Lock() s := d.mu.versions.currentVersion().String() diff --git a/db.go b/db.go index d8ecd83f0e..47c9f87256 100644 --- a/db.go +++ b/db.go @@ -320,6 +320,11 @@ type DB struct { // marker is moved in order to atomically record the new // version. marker *atomicfs.Marker + // ratcheting when set to true indicates that the database is + // currently in the process of ratcheting the format major version + // to vers + 1. As a part of ratcheting the format major version, + // migrations may drop and re-acquire the mutex. + ratcheting bool } // The ID of the next job. Job IDs are passed to event listener @@ -1394,6 +1399,7 @@ func (d *DB) Metrics() *Metrics { metrics.Compact.EstimatedDebt = d.mu.versions.picker.estimatedCompactionDebt(0) metrics.Compact.InProgressBytes = atomic.LoadInt64(&d.mu.versions.atomic.atomicInProgressBytes) metrics.Compact.NumInProgress = int64(d.mu.compact.compactingCount) + metrics.Compact.MarkedFiles = d.mu.versions.currentVersion().Stats.MarkedForCompaction for _, m := range d.mu.mem.queue { metrics.MemTable.Size += m.totalBytes() } diff --git a/flush_external.go b/flush_external.go index c9530b03d9..179beb2127 100644 --- a/flush_external.go +++ b/flush_external.go @@ -74,7 +74,7 @@ func flushExternalTable(untypedDB interface{}, path string, originalMeta *fileMe TablesIngested: 1, }, } - err := d.mu.versions.logAndApply(jobID, ve, metrics, func() []compactionInfo { + err := d.mu.versions.logAndApply(jobID, ve, metrics, false /* forceRotation */, func() []compactionInfo { return d.getInProgressCompactionInfoLocked(nil) }) if err != nil { diff --git a/format_major_version.go b/format_major_version.go index 517074382d..1246d1333f 100644 --- a/format_major_version.go +++ b/format_major_version.go @@ -10,6 +10,7 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" + "github.com/cockroachdb/pebble/internal/manifest" "github.com/cockroachdb/pebble/sstable" "github.com/cockroachdb/pebble/vfs" "github.com/cockroachdb/pebble/vfs/atomicfs" @@ -70,6 +71,20 @@ const ( // FormatBlockPropertyCollector is a format major version that introduces // BlockPropertyCollectors. FormatBlockPropertyCollector + // FormatSplitUserKeysMarked is a format major version that guarantees that + // all files that share user keys with neighbors are marked for compaction + // in the manifest. Ratcheting to FormatSplitUserKeysMarked will block + // (without holding mutexes) until the scan of the LSM is complete and the + // manifest has been rotated. + FormatSplitUserKeysMarked + // FormatMarkedCompacted is a format major version that guarantees that all + // files explicitly marked for compaction in the manifest have been + // compacted. Combined with the FormatSplitUserKeysMarked format major + // version, this version guarantees that there are no user keys split across + // multiple files within a level L1+. Ratcheting to this format version will + // block (without holding mutexes) until all necessary compactions for files + // marked for compaction are complete. + FormatMarkedCompacted // FormatRangeKeys is a format major version that introduces range keys. FormatRangeKeys // FormatNewest always contains the most recent format major version. @@ -86,7 +101,7 @@ func (v FormatMajorVersion) MaxTableFormat() sstable.TableFormat { case FormatDefault, FormatMostCompatible, formatVersionedManifestMarker, FormatVersioned, FormatSetWithDelete: return sstable.TableFormatRocksDBv2 - case FormatBlockPropertyCollector: + case FormatBlockPropertyCollector, FormatSplitUserKeysMarked, FormatMarkedCompacted: return sstable.TableFormatPebblev1 case FormatRangeKeys: return sstable.TableFormatPebblev2 @@ -171,6 +186,23 @@ var formatMajorVersionMigrations = map[FormatMajorVersion]func(*DB) error{ FormatBlockPropertyCollector: func(d *DB) error { return d.finalizeFormatVersUpgrade(FormatBlockPropertyCollector) }, + FormatSplitUserKeysMarked: func(d *DB) error { + // Mark any unmarked files with split-user keys. Note all format major + // versions migrations are invoked with DB.mu locked. + if err := d.markFilesWithSplitUserKeysLocked(); err != nil { + return err + } + return d.finalizeFormatVersUpgrade(FormatSplitUserKeysMarked) + }, + FormatMarkedCompacted: func(d *DB) error { + // Before finalizing the format major version, rewrite any sstables + // still marked for compaction. Note all format major versions + // migrations are invoked with DB.mu locked. + if err := d.compactMarkedFilesLocked(); err != nil { + return err + } + return d.finalizeFormatVersUpgrade(FormatMarkedCompacted) + }, FormatRangeKeys: func(d *DB) error { return d.finalizeFormatVersUpgrade(FormatRangeKeys) }, @@ -240,6 +272,12 @@ func (d *DB) ratchetFormatMajorVersionLocked(formatVers FormatMajorVersion) erro return errors.Newf("pebble: database already at format major version %d; cannot reduce to %d", d.mu.formatVers.vers, formatVers) } + if d.mu.formatVers.ratcheting { + return errors.Newf("pebble: database format major version upgrade is in-progress") + } + d.mu.formatVers.ratcheting = true + defer func() { d.mu.formatVers.ratcheting = false }() + for nextVers := d.mu.formatVers.vers + 1; nextVers <= formatVers; nextVers++ { if err := formatMajorVersionMigrations[nextVers](d); err != nil { return errors.Wrapf(err, "migrating to version %d", nextVers) @@ -274,3 +312,150 @@ func (d *DB) finalizeFormatVersUpgrade(formatVers FormatMajorVersion) error { d.opts.EventListener.FormatUpgrade(formatVers) return nil } + +// compactMarkedFilesLocked performs a migration that schedules rewrite +// compactions to compact away any sstables marked for compaction. +// compactMarkedFilesLocked is run while ratcheting the database's format major +// version to FormatMarkedCompacted. +func (d *DB) compactMarkedFilesLocked() error { + curr := d.mu.versions.currentVersion() + for curr.Stats.MarkedForCompaction > 0 { + // Attempt to schedule a compaction to rewrite a file marked for + // compaction. + d.maybeScheduleCompactionPicker(func(picker compactionPicker, env compactionEnv) *pickedCompaction { + return picker.pickRewriteCompaction(env) + }) + + // The above attempt might succeed and schedule a rewrite compaction. Or + // there might not be available compaction concurrency to schedule the + // compaction. Or compaction of the file might have already been in + // progress. In any scenario, wait until there's some change in the + // state of active compactions. + + // Before waiting, check that the database hasn't been closed. Trying to + // schedule the compaction may have dropped d.mu while waiting for a + // manifest write to complete. In that dropped interim, the database may + // have been closed. + if err := d.closed.Load(); err != nil { + return err.(error) + } + // NB: Waiting on this condition variable drops d.mu while blocked. + d.mu.compact.cond.Wait() + + // Some flush or compaction was scheduled or completed. Loop again to + // check again for files that must be compacted. The next iteration may + // find same file again, but that's okay. It'll eventually succeed in + // scheduling the compaction and eventually be woken by its completion. + curr = d.mu.versions.currentVersion() + } + return nil +} + +// markFilesWithSplitUserKeysLocked scans the LSM's levels 1 through 6 for +// adjacent files that contain the same user key. Such arrangements of files +// were permitted in RocksDB and in Pebble up to SHA a860bbad. +// markFilesWithSplitUserKeysLocked durably marks such files for compaction. +func (d *DB) markFilesWithSplitUserKeysLocked() error { + jobID := d.mu.nextJobID + d.mu.nextJobID++ + + // Files with split user keys are expected to be rare and performing key + // comparisons for every file within the LSM is expensive, so drop the + // database lock while scanning the file metadata. + // + // After scanning, if we found files to mark, we'll re-acquire the mutex + // before marking them, since MarkedForCompaction and the version's + // Stats.MarkedForCompaction are protected by d.mu. + var files [numLevels][]*fileMetadata + var foundSplitUserKey bool + func() { + equal := d.opts.equal() + vers := d.mu.versions.currentVersion() + // Note the unusual locking: unlock, defer Lock(). + d.mu.Unlock() + defer d.mu.Lock() + + for l := numLevels - 1; l > 0; l-- { + iter := vers.Levels[l].Iter() + var prevFile *fileMetadata + var prevUserKey []byte + for f := iter.First(); f != nil; f = iter.Next() { + if prevUserKey != nil && equal(prevUserKey, f.Smallest.UserKey) { + // NB: We may append a file twice, once as prevFile and once + // as f. That's okay, and handled below. + files[l] = append(files[l], prevFile, f) + foundSplitUserKey = true + } + if f.Largest.IsExclusiveSentinel() { + prevUserKey = nil + prevFile = nil + } else { + prevUserKey = f.Largest.UserKey + prevFile = f + } + } + } + }() + + // The database lock has been acquired again by the defer within the above + // anonymous function. + if !foundSplitUserKey { + // Nothing to do. + return nil + } + + // Lock the manifest for a coherent view of the LSM. + d.mu.versions.logLock() + vers := d.mu.versions.currentVersion() + for l, filesToMark := range files { + if len(filesToMark) == 0 { + continue + } + + for _, f := range filesToMark { + if f.MarkedForCompaction { + continue + } + + // We need to determine if the file still exists in the current + // version of the LSM. Note that we're making the assumption that if + // f still exists within the LSM, it's within the same level. That's + // okay, because only move compactions may move files between levels + // and move compactions only apply to compactions with a single file + // in the start level, but we already know these files have + // multi-file atomic compaction units. + if m := vers.Levels[l].Find(d.cmp, f); m != nil { + // It's still there. Mark it for compaction and update the + // version's stats. + f.MarkedForCompaction = true + vers.Stats.MarkedForCompaction++ + } + } + // The compaction picker uses the markedForCompactionAnnotator to + // quickly find files marked for compaction, or to quickly determine + // that there are no such files marked for compaction within a level. + // A b-tree node may be annotated with an annotation recording that + // there are no files marked for compaction within the node's subtree, + // based on the assumption that it's static. + // + // Since we're marking files for compaction, these b-tree nodes' + // annotations will be out of date. Clear the compaction-picking + // annotation, so that it's recomputed the next time the compaction + // picker looks for a file marked for compaction. + vers.Levels[l].InvalidateAnnotation(markedForCompactionAnnotator{}) + } + + // The 'marked-for-compaction' bit is persisted in the MANIFEST file + // metadata. We've already modified the in-memory file metadata, but the + // manifest hasn't been updated. Force rotation to a new MANIFEST file, + // which will write every file metadata to the new manifest file and ensure + // that the now marked-for-compaction file metadata are persisted as marked. + // NB: This call to logAndApply will unlockthe MANIFEST, which we locked up + // above before obtaining `vers`. + return d.mu.versions.logAndApply( + jobID, + &manifest.VersionEdit{}, + map[int]*LevelMetrics{}, + true, /* forceRotation */ + func() []compactionInfo { return d.getInProgressCompactionInfoLocked(nil) }) +} diff --git a/format_major_version_test.go b/format_major_version_test.go index f37a6a9a03..920e6cc42e 100644 --- a/format_major_version_test.go +++ b/format_major_version_test.go @@ -5,9 +5,14 @@ package pebble import ( + "bytes" "fmt" + "strconv" "testing" + "time" + "github.com/cockroachdb/pebble/internal/base" + "github.com/cockroachdb/pebble/internal/datadriven" "github.com/cockroachdb/pebble/sstable" "github.com/cockroachdb/pebble/vfs" "github.com/cockroachdb/pebble/vfs/atomicfs" @@ -191,6 +196,8 @@ func TestFormatMajorVersions_TableFormat(t *testing.T) { FormatVersioned: sstable.TableFormatRocksDBv2, FormatSetWithDelete: sstable.TableFormatRocksDBv2, FormatBlockPropertyCollector: sstable.TableFormatPebblev1, + FormatSplitUserKeysMarked: sstable.TableFormatPebblev1, + FormatMarkedCompacted: sstable.TableFormatPebblev1, FormatRangeKeys: sstable.TableFormatPebblev2, } @@ -204,3 +211,109 @@ func TestFormatMajorVersions_TableFormat(t *testing.T) { fmv := FormatNewest + 1 require.Panics(t, func() { _ = fmv.MaxTableFormat() }) } + +func TestSplitUserKeyMigration(t *testing.T) { + var d *DB + var opts *Options + var fs vfs.FS + var buf bytes.Buffer + defer func() { + if d != nil { + require.NoError(t, d.Close()) + } + }() + + datadriven.RunTest(t, "testdata/split_user_key_migration", + func(td *datadriven.TestData) string { + switch td.Cmd { + case "define": + if d != nil { + if err := d.Close(); err != nil { + return err.Error() + } + buf.Reset() + } + opts = &Options{ + FormatMajorVersion: FormatBlockPropertyCollector, + EventListener: EventListener{ + CompactionEnd: func(info CompactionInfo) { + // JobID's aren't deterministic, especially w/ table stats + // enabled. Use a fixed job ID for data-driven test output. + info.JobID = 100 + fmt.Fprintln(&buf, info) + }, + }, + } + var err error + if d, err = runDBDefineCmd(td, opts); err != nil { + return err.Error() + } + + // Mock time so that we get consistent log output written to + // buf. + t := time.Now() + d.timeNow = func() time.Time { + t = t.Add(time.Second) + return t + } + fs = d.opts.FS + d.mu.Lock() + defer d.mu.Unlock() + return d.mu.versions.currentVersion().DebugString(base.DefaultFormatter) + case "reopen": + if d != nil { + if err := d.Close(); err != nil { + return err.Error() + } + buf.Reset() + } + opts.FS = fs + var err error + d, err = Open("", opts) + if err != nil { + return err.Error() + } + // Mock time so that we get consistent log output written to + // buf. + d.mu.Lock() + defer d.mu.Unlock() + t := time.Now() + d.timeNow = func() time.Time { + t = t.Add(time.Second) + return t + } + return "OK" + case "build": + if err := runBuildCmd(td, d, fs); err != nil { + return err.Error() + } + return "" + case "force-ingest": + if err := runForceIngestCmd(td, d); err != nil { + return err.Error() + } + d.mu.Lock() + defer d.mu.Unlock() + return d.mu.versions.currentVersion().DebugString(base.DefaultFormatter) + case "format-major-version": + return d.FormatMajorVersion().String() + case "ratchet-format-major-version": + v, err := strconv.Atoi(td.CmdArgs[0].String()) + if err != nil { + return err.Error() + } + if err := d.RatchetFormatMajorVersion(FormatMajorVersion(v)); err != nil { + return err.Error() + } + return buf.String() + case "lsm": + return runLSMCmd(td, d) + case "marked-file-count": + m := d.Metrics() + return fmt.Sprintf("%d files marked for compaction", m.Compact.MarkedFiles) + default: + return fmt.Sprintf("unrecognized command %q", td.Cmd) + } + + }) +} diff --git a/ingest.go b/ingest.go index 641c6e5d22..d27796263e 100644 --- a/ingest.go +++ b/ingest.go @@ -584,7 +584,10 @@ func (d *DB) Ingest(paths []string) error { if d.opts.ReadOnly { return ErrReadOnly } + return d.ingest(paths, ingestTargetLevel) +} +func (d *DB) ingest(paths []string, targetLevelFunc ingestTargetLevelFunc) error { // Allocate file numbers for all of the files being ingested and mark them as // pending in order to prevent them from being deleted. Note that this causes // the file number ordering to be out of alignment with sequence number @@ -681,7 +684,7 @@ func (d *DB) Ingest(paths []string) error { // Assign the sstables to the correct level in the LSM and apply the // version edit. - ve, err = d.ingestApply(jobID, meta) + ve, err = d.ingestApply(jobID, meta, targetLevelFunc) } d.commit.AllocateSeqNum(len(meta), prepare, apply) @@ -719,7 +722,17 @@ func (d *DB) Ingest(paths []string) error { return err } -func (d *DB) ingestApply(jobID int, meta []*fileMetadata) (*versionEdit, error) { +type ingestTargetLevelFunc func( + newIters tableNewIters, + iterOps IterOptions, + cmp Compare, + v *version, + baseLevel int, + compactions map[*compaction]struct{}, + meta *fileMetadata, +) (int, error) + +func (d *DB) ingestApply(jobID int, meta []*fileMetadata, findTargetLevel ingestTargetLevelFunc) (*versionEdit, error) { d.mu.Lock() defer d.mu.Unlock() @@ -744,7 +757,7 @@ func (d *DB) ingestApply(jobID int, meta []*fileMetadata) (*versionEdit, error) m := meta[i] f := &ve.NewFiles[i] var err error - f.Level, err = ingestTargetLevel(d.newIters, iterOps, d.cmp, current, baseLevel, d.mu.compact.inProgress, m) + f.Level, err = findTargetLevel(d.newIters, iterOps, d.cmp, current, baseLevel, d.mu.compact.inProgress, m) if err != nil { d.mu.versions.logUnlock() return nil, err @@ -760,7 +773,7 @@ func (d *DB) ingestApply(jobID int, meta []*fileMetadata) (*versionEdit, error) levelMetrics.BytesIngested += m.Size levelMetrics.TablesIngested++ } - if err := d.mu.versions.logAndApply(jobID, ve, metrics, func() []compactionInfo { + if err := d.mu.versions.logAndApply(jobID, ve, metrics, false /* forceRotation */, func() []compactionInfo { return d.getInProgressCompactionInfoLocked(nil) }); err != nil { return nil, err diff --git a/internal/manifest/btree.go b/internal/manifest/btree.go index 1a1d39256c..13ccaf577b 100644 --- a/internal/manifest/btree.go +++ b/internal/manifest/btree.go @@ -575,6 +575,26 @@ func (n *node) rebalanceOrMerge(i int) { } } +func (n *node) invalidateAnnotation(a Annotator) { + // Find this annotator's annotation on this node. + var annot *annotation + for i := range n.annot { + if n.annot[i].annotator == a { + annot = &n.annot[i] + } + } + + if annot != nil && annot.valid { + annot.valid = false + annot.v = a.Zero(annot.v) + } + if !n.leaf { + for i := int16(0); i <= n.count; i++ { + n.children[i].invalidateAnnotation(a) + } + } +} + func (n *node) annotation(a Annotator) (interface{}, bool) { // Find this annotator's annotation on this node. var annot *annotation diff --git a/internal/manifest/level_metadata.go b/internal/manifest/level_metadata.go index 153b8a014e..6e7f0f6c08 100644 --- a/internal/manifest/level_metadata.go +++ b/internal/manifest/level_metadata.go @@ -14,14 +14,16 @@ import ( // LevelMetadata contains metadata for all of the files within // a level of the LSM. type LevelMetadata struct { - tree btree + level int + tree btree } // clone makes a copy of the level metadata, implicitly increasing the ref // count of every file contained within lm. func (lm *LevelMetadata) clone() LevelMetadata { return LevelMetadata{ - tree: lm.tree.clone(), + level: lm.level, + tree: lm.tree.clone(), } } @@ -35,6 +37,7 @@ func makeLevelMetadata(cmp Compare, level int, files []*FileMetadata) LevelMetad bcmp = btreeCmpSmallestKey(cmp) } var lm LevelMetadata + lm.level = level lm.tree, _ = makeBTree(bcmp, files) return lm } @@ -68,13 +71,16 @@ func (lm *LevelMetadata) Slice() LevelSlice { return LevelSlice{iter: lm.tree.iter(), length: lm.tree.length} } -// Find finds the provided file in the level if it exists. The level must be -// key-sorted (eg, non-L0). +// Find finds the provided file in the level if it exists. func (lm *LevelMetadata) Find(cmp base.Compare, m *FileMetadata) *LevelFile { - // TODO(jackson): Add an assertion that lm is key-sorted. - o := overlaps(lm.Iter(), cmp, m.Smallest.UserKey, - m.Largest.UserKey, m.Largest.IsExclusiveSentinel()) - iter := o.Iter() + iter := lm.Iter() + if lm.level != 0 { + // If lm holds files for levels >0, we can narrow our search by binary + // searching by bounds. + o := overlaps(iter, cmp, m.Smallest.UserKey, + m.Largest.UserKey, m.Largest.IsExclusiveSentinel()) + iter = o.Iter() + } for f := iter.First(); f != nil; f = iter.Next() { if f == m { lf := iter.Take() @@ -97,6 +103,18 @@ func (lm *LevelMetadata) Annotation(annotator Annotator) interface{} { return v } +// InvalidateAnnotation clears any cached annotations defined by Annotator. The +// Annotator is used as the key for pre-calculated values, so equal Annotators +// must be used to clear the appropriate cached annotation. InvalidateAnnotation +// must not be called concurrently, and in practice this is achieved by +// requiring callers to hold DB.mu. +func (lm *LevelMetadata) InvalidateAnnotation(annotator Annotator) { + if lm.Empty() { + return + } + lm.tree.root.invalidateAnnotation(annotator) +} + // LevelFile holds a file's metadata along with its position // within a level of the LSM. type LevelFile struct { diff --git a/internal/manifest/version.go b/internal/manifest/version.go index e57aae5a99..c1ecb6183e 100644 --- a/internal/manifest/version.go +++ b/internal/manifest/version.go @@ -141,10 +141,23 @@ type FileMetadata struct { IsIntraL0Compacting bool // True if the file is actively being compacted. Protected by DB.mu. Compacting bool - // True if user asked us to compact this file. This flag is only set and - // respected by RocksDB but exists here to preserve its value in the - // MANIFEST. - markedForCompaction bool + // True if compaction of this file has been explicitly requested. + // Previously, RocksDB and earlier versions of Pebble allowed this + // flag to be set by a user table property collector. Some earlier + // versions of Pebble respected this flag, while other more recent + // versions ignored this flag. + // + // More recently this flag has been repurposed to facilitate the + // compaction of 'atomic compaction units'. Files marked for + // compaction are compacted in a rewrite compaction at the lowest + // possible compaction priority. + // + // NB: A count of files marked for compaction is maintained on + // Version, and compaction picking reads cached annotations + // determined by this field. + // + // Protected by DB.mu. + MarkedForCompaction bool // HasPointKeys and HasRangeKeys track whether the table contains point and // range keys, respectively. HasPointKeys bool @@ -523,7 +536,7 @@ func NewVersion( // initial B-Tree, we swap out the btreeCmp for the correct one. // TODO(jackson): Adjust or remove the tests and remove this. v.Levels[l].tree, _ = makeBTree(btreeCmpSpecificOrder(files[l]), files[l]) - + v.Levels[l].level = l if l == 0 { v.Levels[l].tree.cmp = btreeCmpSeqNum } else { @@ -590,6 +603,14 @@ type Version struct { // removed. Will be called with list.mu held. Deleted func(obsolete []*FileMetadata) + // Stats holds aggregated stats about the version maintained from + // version to version. + Stats struct { + // MarkedForCompaction records the count of files marked for + // compaction within the version. + MarkedForCompaction int + } + // The list the version is linked into. list *VersionList diff --git a/internal/manifest/version_edit.go b/internal/manifest/version_edit.go index 6829381d68..f1d8a7ec46 100644 --- a/internal/manifest/version_edit.go +++ b/internal/manifest/version_edit.go @@ -257,24 +257,24 @@ func (v *VersionEdit) Decode(r io.Reader) error { v.NewFiles = append(v.NewFiles, NewFileEntry{ Level: level, Meta: &FileMetadata{ - FileNum: fileNum, - Size: size, - CreationTime: int64(creationTime), - SmallestPointKey: smallestPointKey, - LargestPointKey: largestPointKey, - HasPointKeys: true, - SmallestSeqNum: smallestSeqNum, - LargestSeqNum: largestSeqNum, - markedForCompaction: markedForCompaction, + FileNum: fileNum, + Size: size, + CreationTime: int64(creationTime), + SmallestPointKey: smallestPointKey, + LargestPointKey: largestPointKey, + HasPointKeys: true, + SmallestSeqNum: smallestSeqNum, + LargestSeqNum: largestSeqNum, // TODO(travers): For now the smallest and largest keys are pinned to // the smallest and largest point keys, as these are the only types of // keys supported in the manifest. This will need to change when the // manifest is updated to support range keys, which will most likely // leverage a bitset to infer which key types (points or ranges) are // used for the overall smallest and largest keys. - Smallest: smallestPointKey, - Largest: largestPointKey, - boundsSet: true, + Smallest: smallestPointKey, + Largest: largestPointKey, + boundsSet: true, + MarkedForCompaction: markedForCompaction, }, }) @@ -329,7 +329,7 @@ func (v *VersionEdit) Encode(w io.Writer) error { } for _, x := range v.NewFiles { var customFields bool - if x.Meta.markedForCompaction || x.Meta.CreationTime != 0 { + if x.Meta.MarkedForCompaction || x.Meta.CreationTime != 0 { customFields = true e.writeUvarint(tagNewFile4) } else { @@ -349,7 +349,7 @@ func (v *VersionEdit) Encode(w io.Writer) error { n := binary.PutUvarint(buf[:], uint64(x.Meta.CreationTime)) e.writeBytes(buf[:n]) } - if x.Meta.markedForCompaction { + if x.Meta.MarkedForCompaction { e.writeUvarint(customTagNeedsCompaction) e.writeBytes([]byte{1}) } @@ -453,6 +453,10 @@ type BulkVersionEdit struct { // uses AddedByFileNum to correctly populate the BulkVersionEdit's Deleted // field with non-nil *FileMetadata. AddedByFileNum map[base.FileNum]*FileMetadata + + // MarkedForCompactionCountDiff holds the aggregated count of files + // marked for compaction added or removed. + MarkedForCompactionCountDiff int } // Accumulate adds the file addition and deletions in the specified version @@ -475,6 +479,9 @@ func (b *BulkVersionEdit) Accumulate(ve *VersionEdit) error { return base.CorruptionErrorf("pebble: file deleted L%d.%s before it was inserted", df.Level, df.FileNum) } } + if m.MarkedForCompaction { + b.MarkedForCompactionCountDiff-- + } dmap[df.FileNum] = m } @@ -490,6 +497,9 @@ func (b *BulkVersionEdit) Accumulate(ve *VersionEdit) error { if b.AddedByFileNum != nil { b.AddedByFileNum[nf.Meta.FileNum] = nf.Meta } + if nf.Meta.MarkedForCompaction { + b.MarkedForCompactionCountDiff++ + } } return nil } @@ -525,6 +535,16 @@ func (b *BulkVersionEdit) Apply( } v := new(Version) + + // Adjust the count of files marked for compaction. + if curr != nil { + v.Stats.MarkedForCompaction = curr.Stats.MarkedForCompaction + } + v.Stats.MarkedForCompaction += b.MarkedForCompactionCountDiff + if v.Stats.MarkedForCompaction < 0 { + return nil, nil, base.CorruptionErrorf("pebble: version marked for compaction count negative") + } + for level := range v.Levels { if curr == nil || curr.Levels[level].tree.root == nil { v.Levels[level] = makeLevelMetadata(cmp, level, nil /* files */) diff --git a/internal/manifest/version_edit_test.go b/internal/manifest/version_edit_test.go index b968dfd685..37cddf5627 100644 --- a/internal/manifest/version_edit_test.go +++ b/internal/manifest/version_edit_test.go @@ -57,7 +57,7 @@ func TestVersionEditRoundTrip(t *testing.T) { CreationTime: 806040, SmallestSeqNum: 3, LargestSeqNum: 5, - markedForCompaction: true, + MarkedForCompaction: true, }).ExtendPointKeyBounds( cmp, base.DecodeInternalKey([]byte("A\x00\x01\x02\x03\x04\x05\x06\x07")), diff --git a/metrics.go b/metrics.go index e4319ffd92..97153397ed 100644 --- a/metrics.go +++ b/metrics.go @@ -133,6 +133,7 @@ type Metrics struct { ElisionOnlyCount int64 MoveCount int64 ReadCount int64 + RewriteCount int64 // An estimate of the number of bytes that need to be compacted for the LSM // to reach a stable state. EstimatedDebt uint64 @@ -142,6 +143,10 @@ type Metrics struct { InProgressBytes int64 // Number of compactions that are in-progress. NumInProgress int64 + // MarkedFiles is a count of files that are marked for + // compaction. Such files are compacted in a rewrite compaction + // when no other compactions are picked. + MarkedFiles int } Flush struct { @@ -375,12 +380,13 @@ func (m *Metrics) SafeFormat(w redact.SafePrinter, _ rune) { humanize.IEC.Int64(m.Compact.InProgressBytes), redact.Safe(m.Compact.NumInProgress), redact.SafeString("")) - w.Printf(" ctype %9d %7d %7d %7d %7d (default, delete, elision, move, read)\n", + w.Printf(" ctype %9d %7d %7d %7d %7d %7d (default, delete, elision, move, read, rewrite)\n", redact.Safe(m.Compact.DefaultCount), redact.Safe(m.Compact.DeleteOnlyCount), redact.Safe(m.Compact.ElisionOnlyCount), redact.Safe(m.Compact.MoveCount), - redact.Safe(m.Compact.ReadCount)) + redact.Safe(m.Compact.ReadCount), + redact.Safe(m.Compact.RewriteCount)) w.Printf(" memtbl %9d %7s\n", redact.Safe(m.MemTable.Count), humanize.IEC.Uint64(m.MemTable.Size)) diff --git a/metrics_test.go b/metrics_test.go index 6874588d4b..c571b88fd8 100644 --- a/metrics_test.go +++ b/metrics_test.go @@ -27,6 +27,7 @@ func TestMetricsFormat(t *testing.T) { m.Compact.ElisionOnlyCount = 29 m.Compact.MoveCount = 30 m.Compact.ReadCount = 31 + m.Compact.RewriteCount = 32 m.Compact.EstimatedDebt = 6 m.Compact.InProgressBytes = 7 m.Compact.NumInProgress = 2 @@ -84,7 +85,7 @@ __level_____count____size___score______in__ingest(sz_cnt)____move(sz_cnt)___writ total 2807 2.7 K - 2.8 K 2.8 K 2.9 K 2.8 K 2.9 K 8.4 K 5.7 K 2.8 K 28 3.0 flush 8 compact 5 6 B 7 B 2 (size == estimated-debt, score = in-progress-bytes, in = num-in-progress) - ctype 27 28 29 30 31 (default, delete, elision, move, read) + ctype 27 28 29 30 31 32 (default, delete, elision, move, read, rewrite) memtbl 12 11 B zmemtbl 14 13 B ztbl 16 15 B @@ -227,7 +228,7 @@ __level_____count____size___score______in__ingest(sz_cnt)____move(sz_cnt)___writ total 0 0 B - 0 B 0 B 0 0 B 0 0 B 0 0 B 0 0.0 flush 0 compact 0 0 B 0 B 0 (size == estimated-debt, score = in-progress-bytes, in = num-in-progress) - ctype 0 0 0 0 0 (default, delete, elision, move, read) + ctype 0 0 0 0 0 0 (default, delete, elision, move, read, rewrite) memtbl 0 0 B zmemtbl 0 0 B ztbl 0 0 B diff --git a/open.go b/open.go index 5f98df058f..036df5ba61 100644 --- a/open.go +++ b/open.go @@ -237,7 +237,6 @@ func Open(dirname string, opts *Options) (db *DB, _ error) { if err := d.mu.versions.currentVersion().CheckConsistency(dirname, opts.FS); err != nil { return nil, err } - } // If the Options specify a format major version higher than the @@ -370,7 +369,7 @@ func Open(dirname string, opts *Options) (db *DB, _ error) { // crash before the manifest is synced could leave two WALs with // unclean tails. d.mu.versions.logLock() - if err := d.mu.versions.logAndApply(jobID, &ve, newFileMetrics(ve.NewFiles), func() []compactionInfo { + if err := d.mu.versions.logAndApply(jobID, &ve, newFileMetrics(ve.NewFiles), false /* forceRotation */, func() []compactionInfo { return nil }); err != nil { return nil, err diff --git a/open_test.go b/open_test.go index 68e34ec235..2bc0d819d6 100644 --- a/open_test.go +++ b/open_test.go @@ -101,7 +101,7 @@ func TestNewDBFilenames(t *testing.T) { "LOCK", "MANIFEST-000001", "OPTIONS-000003", - "marker.format-version.000005.006", + "marker.format-version.000007.008", "marker.manifest.000001.MANIFEST-000001", }, } diff --git a/testdata/checkpoint b/testdata/checkpoint index f35759ef82..61ae796975 100644 --- a/testdata/checkpoint +++ b/testdata/checkpoint @@ -34,6 +34,12 @@ sync: db create: db/marker.format-version.000005.006 close: db/marker.format-version.000005.006 sync: db +create: db/marker.format-version.000006.007 +close: db/marker.format-version.000006.007 +sync: db +create: db/marker.format-version.000007.008 +close: db/marker.format-version.000007.008 +sync: db sync: db/MANIFEST-000001 create: db/000002.log sync: db @@ -99,9 +105,9 @@ close: open-dir: checkpoints/checkpoint1 link: db/OPTIONS-000003 -> checkpoints/checkpoint1/OPTIONS-000003 open-dir: checkpoints/checkpoint1 -create: checkpoints/checkpoint1/marker.format-version.000001.006 -sync: checkpoints/checkpoint1/marker.format-version.000001.006 -close: checkpoints/checkpoint1/marker.format-version.000001.006 +create: checkpoints/checkpoint1/marker.format-version.000001.008 +sync: checkpoints/checkpoint1/marker.format-version.000001.008 +close: checkpoints/checkpoint1/marker.format-version.000001.008 sync: checkpoints/checkpoint1 close: checkpoints/checkpoint1 create: checkpoints/checkpoint1/MANIFEST-000001 @@ -156,7 +162,7 @@ CURRENT LOCK MANIFEST-000001 OPTIONS-000003 -marker.format-version.000005.006 +marker.format-version.000007.008 marker.manifest.000001.MANIFEST-000001 list checkpoints/checkpoint1 @@ -166,7 +172,7 @@ list checkpoints/checkpoint1 000007.sst MANIFEST-000001 OPTIONS-000003 -marker.format-version.000001.006 +marker.format-version.000001.008 marker.manifest.000001.MANIFEST-000001 open checkpoints/checkpoint1 readonly diff --git a/testdata/compaction_picker_L0 b/testdata/compaction_picker_L0 index ece043f8a2..a14f7ab34d 100644 --- a/testdata/compaction_picker_L0 +++ b/testdata/compaction_picker_L0 @@ -389,3 +389,25 @@ pick-auto ---- L0 -> L6 L0: 000051,000053 + +# At low priority, find and compact marked-for-compaction files. + +define +L0 + 000049:t.SET.22-t.SET.22 +L6 + 000045:f.SET.0-x.SET.0 +---- +0.0: + 000049:[t#22,SET-t#22,SET] +6: + 000045:[f#0,SET-x#0,SET] + +mark-for-compaction file=000049 +---- +marked L0.000049 + +pick-auto l0_compaction_threshold=1000 +---- +L0 -> L0 +L0: 000049 diff --git a/testdata/event_listener b/testdata/event_listener index 67f3369b24..03dd585649 100644 --- a/testdata/event_listener +++ b/testdata/event_listener @@ -42,6 +42,14 @@ create: db/marker.format-version.000005.006 close: db/marker.format-version.000005.006 sync: db upgraded to format version: 006 +create: db/marker.format-version.000006.007 +close: db/marker.format-version.000006.007 +sync: db +upgraded to format version: 007 +create: db/marker.format-version.000007.008 +close: db/marker.format-version.000007.008 +sync: db +upgraded to format version: 008 create: db/MANIFEST-000003 close: db/MANIFEST-000001 sync: db/MANIFEST-000003 @@ -65,10 +73,10 @@ sync: wal/000002.log close: wal/000002.log create: wal/000005.log sync: wal -[JOB 2] WAL created 000005 -[JOB 3] flushing 1 memtable to L0 +[JOB 3] WAL created 000005 +[JOB 4] flushing 1 memtable to L0 create: db/000006.sst -[JOB 3] flushing: sstable created 000006 +[JOB 4] flushing: sstable created 000006 sync: db/000006.sst close: db/000006.sst sync: db @@ -78,9 +86,9 @@ sync: db/MANIFEST-000007 create: db/marker.manifest.000003.MANIFEST-000007 close: db/marker.manifest.000003.MANIFEST-000007 sync: db -[JOB 3] MANIFEST created 000007 -[JOB 3] flushed 1 memtable to L0 [000006] (770 B), in 1.0s (2.0s total), output rate 770 B/s -[JOB 3] MANIFEST deleted 000001 +[JOB 4] MANIFEST created 000007 +[JOB 4] flushed 1 memtable to L0 [000006] (770 B), in 1.0s (2.0s total), output rate 770 B/s +[JOB 4] MANIFEST deleted 000001 compact ---- @@ -89,10 +97,10 @@ sync: wal/000005.log close: wal/000005.log reuseForWrite: wal/000002.log -> wal/000008.log sync: wal -[JOB 4] WAL created 000008 (recycled 000002) -[JOB 5] flushing 1 memtable to L0 +[JOB 5] WAL created 000008 (recycled 000002) +[JOB 6] flushing 1 memtable to L0 create: db/000009.sst -[JOB 5] flushing: sstable created 000009 +[JOB 6] flushing: sstable created 000009 sync: db/000009.sst close: db/000009.sst sync: db @@ -102,12 +110,12 @@ sync: db/MANIFEST-000010 create: db/marker.manifest.000004.MANIFEST-000010 close: db/marker.manifest.000004.MANIFEST-000010 sync: db -[JOB 5] MANIFEST created 000010 -[JOB 5] flushed 1 memtable to L0 [000009] (770 B), in 1.0s (2.0s total), output rate 770 B/s -[JOB 5] MANIFEST deleted 000003 -[JOB 6] compacting(default) L0 [000006 000009] (1.5 K) + L6 [] (0 B) +[JOB 6] MANIFEST created 000010 +[JOB 6] flushed 1 memtable to L0 [000009] (770 B), in 1.0s (2.0s total), output rate 770 B/s +[JOB 6] MANIFEST deleted 000003 +[JOB 7] compacting(default) L0 [000006 000009] (1.5 K) + L6 [] (0 B) create: db/000011.sst -[JOB 6] compacting: sstable created 000011 +[JOB 7] compacting: sstable created 000011 sync: db/000011.sst close: db/000011.sst sync: db @@ -117,11 +125,11 @@ sync: db/MANIFEST-000012 create: db/marker.manifest.000005.MANIFEST-000012 close: db/marker.manifest.000005.MANIFEST-000012 sync: db -[JOB 6] MANIFEST created 000012 -[JOB 6] compacted(default) L0 [000006 000009] (1.5 K) + L6 [] (0 B) -> L6 [000011] (770 B), in 1.0s (2.0s total), output rate 770 B/s -[JOB 6] sstable deleted 000006 -[JOB 6] sstable deleted 000009 -[JOB 6] MANIFEST deleted 000007 +[JOB 7] MANIFEST created 000012 +[JOB 7] compacted(default) L0 [000006 000009] (1.5 K) + L6 [] (0 B) -> L6 [000011] (770 B), in 1.0s (2.0s total), output rate 770 B/s +[JOB 7] sstable deleted 000006 +[JOB 7] sstable deleted 000009 +[JOB 7] MANIFEST deleted 000007 disable-file-deletions ---- @@ -133,10 +141,10 @@ sync: wal/000008.log close: wal/000008.log reuseForWrite: wal/000005.log -> wal/000013.log sync: wal -[JOB 7] WAL created 000013 (recycled 000005) -[JOB 8] flushing 1 memtable to L0 +[JOB 8] WAL created 000013 (recycled 000005) +[JOB 9] flushing 1 memtable to L0 create: db/000014.sst -[JOB 8] flushing: sstable created 000014 +[JOB 9] flushing: sstable created 000014 sync: db/000014.sst close: db/000014.sst sync: db @@ -146,17 +154,17 @@ sync: db/MANIFEST-000015 create: db/marker.manifest.000006.MANIFEST-000015 close: db/marker.manifest.000006.MANIFEST-000015 sync: db -[JOB 8] MANIFEST created 000015 -[JOB 8] flushed 1 memtable to L0 [000014] (770 B), in 1.0s (2.0s total), output rate 770 B/s +[JOB 9] MANIFEST created 000015 +[JOB 9] flushed 1 memtable to L0 [000014] (770 B), in 1.0s (2.0s total), output rate 770 B/s enable-file-deletions ---- -[JOB 9] MANIFEST deleted 000010 +[JOB 10] MANIFEST deleted 000010 ingest ---- link: ext/0 -> db/000016.sst -[JOB 10] ingesting: sstable created 000016 +[JOB 11] ingesting: sstable created 000016 sync: db create: db/MANIFEST-000017 close: db/MANIFEST-000015 @@ -164,9 +172,9 @@ sync: db/MANIFEST-000017 create: db/marker.manifest.000007.MANIFEST-000017 close: db/marker.manifest.000007.MANIFEST-000017 sync: db -[JOB 10] MANIFEST created 000017 -[JOB 10] MANIFEST deleted 000012 -[JOB 10] ingested L0:000016 (825 B) +[JOB 11] MANIFEST created 000017 +[JOB 11] MANIFEST deleted 000012 +[JOB 11] ingested L0:000016 (825 B) metrics ---- @@ -182,7 +190,7 @@ __level_____count____size___score______in__ingest(sz_cnt)____move(sz_cnt)___writ total 3 2.3 K - 933 B 825 B 1 0 B 0 3.9 K 4 1.5 K 3 4.3 flush 3 compact 1 2.3 K 0 B 0 (size == estimated-debt, score = in-progress-bytes, in = num-in-progress) - ctype 1 0 0 0 0 (default, delete, elision, move, read) + ctype 1 0 0 0 0 0 (default, delete, elision, move, read, rewrite) memtbl 1 256 K zmemtbl 0 0 B ztbl 0 0 B @@ -209,9 +217,9 @@ close: open-dir: checkpoint link: db/OPTIONS-000004 -> checkpoint/OPTIONS-000004 open-dir: checkpoint -create: checkpoint/marker.format-version.000001.006 -sync: checkpoint/marker.format-version.000001.006 -close: checkpoint/marker.format-version.000001.006 +create: checkpoint/marker.format-version.000001.008 +sync: checkpoint/marker.format-version.000001.008 +close: checkpoint/marker.format-version.000001.008 sync: checkpoint close: checkpoint create: checkpoint/MANIFEST-000017 diff --git a/testdata/ingest b/testdata/ingest index 3018d16950..89fd2133c4 100644 --- a/testdata/ingest +++ b/testdata/ingest @@ -43,7 +43,7 @@ __level_____count____size___score______in__ingest(sz_cnt)____move(sz_cnt)___writ total 1 833 B - 833 B 833 B 1 0 B 0 833 B 0 0 B 1 1.0 flush 0 compact 0 0 B 0 B 0 (size == estimated-debt, score = in-progress-bytes, in = num-in-progress) - ctype 0 0 0 0 0 (default, delete, elision, move, read) + ctype 0 0 0 0 0 0 (default, delete, elision, move, read, rewrite) memtbl 1 256 K zmemtbl 0 0 B ztbl 0 0 B diff --git a/testdata/ingest_load b/testdata/ingest_load index 4b082c3cb3..bd2d91da2c 100644 --- a/testdata/ingest_load +++ b/testdata/ingest_load @@ -89,11 +89,11 @@ a.RANGEDEL.0:b ranges: #0,0-#0,0 # Loading tables at an unsupported table format results in an error. -# Write a table at version 6 (Pebble,v2) into a DB at version 5 (Pebble,v1). -load writer-version=6 db-version=5 +# Write a table at version 7 (Pebble,v2) into a DB at version 6 (Pebble,v1). +load writer-version=8 db-version=7 a.SET.1: ---- -pebble: table with format (Pebble,v2) unsupported at DB format major version 5, (Pebble,v1) +pebble: table with format (Pebble,v2) unsupported at DB format major version 7, (Pebble,v1) # Tables with range keys only. diff --git a/testdata/marked_for_compaction b/testdata/marked_for_compaction new file mode 100644 index 0000000000..5464a24aa8 --- /dev/null +++ b/testdata/marked_for_compaction @@ -0,0 +1,28 @@ +define +L0 + c.SET.11:foo +L1 + c.SET.0:foo + d.SET.0:foo +---- +0.0: + 000004:[c#11,SET-c#11,SET] points:[c#11,SET-c#11,SET] +1: + 000005:[c#0,SET-d#0,SET] points:[c#0,SET-d#0,SET] + +mark-for-compaction file=000005 +---- +marked L1.000005 + +mark-for-compaction file=000004 +---- +marked L0.000004 + +maybe-compact +---- +[JOB 100] compacted(rewrite) L1 [000005] (779 B) + L1 [] (0 B) -> L1 [000006] (779 B), in 1.0s (2.0s total), output rate 779 B/s +[JOB 100] compacted(rewrite) L0 [000004] (773 B) + L0 [] (0 B) -> L0 [000007] (773 B), in 1.0s (2.0s total), output rate 773 B/s +0.0: + 000007:[c#11,SET-c#11,SET] points:[c#11,SET-c#11,SET] +1: + 000006:[c#0,SET-d#0,SET] points:[c#0,SET-d#0,SET] diff --git a/testdata/metrics b/testdata/metrics index b4f9891b8d..adcf53ee55 100644 --- a/testdata/metrics +++ b/testdata/metrics @@ -29,7 +29,7 @@ __level_____count____size___score______in__ingest(sz_cnt)____move(sz_cnt)___writ total 1 771 B - 56 B 0 B 0 0 B 0 827 B 1 0 B 1 14.8 flush 1 compact 0 0 B 0 B 0 (size == estimated-debt, score = in-progress-bytes, in = num-in-progress) - ctype 0 0 0 0 0 (default, delete, elision, move, read) + ctype 0 0 0 0 0 0 (default, delete, elision, move, read, rewrite) memtbl 1 256 K zmemtbl 1 256 K ztbl 0 0 B @@ -77,7 +77,7 @@ __level_____count____size___score______in__ingest(sz_cnt)____move(sz_cnt)___writ total 1 778 B - 84 B 0 B 0 0 B 0 2.3 K 3 1.5 K 1 28.6 flush 2 compact 1 0 B 0 B 0 (size == estimated-debt, score = in-progress-bytes, in = num-in-progress) - ctype 1 0 0 0 0 (default, delete, elision, move, read) + ctype 1 0 0 0 0 0 (default, delete, elision, move, read, rewrite) memtbl 1 256 K zmemtbl 2 512 K ztbl 2 1.5 K @@ -110,7 +110,7 @@ __level_____count____size___score______in__ingest(sz_cnt)____move(sz_cnt)___writ total 1 778 B - 84 B 0 B 0 0 B 0 2.3 K 3 1.5 K 1 28.6 flush 2 compact 1 0 B 0 B 0 (size == estimated-debt, score = in-progress-bytes, in = num-in-progress) - ctype 1 0 0 0 0 (default, delete, elision, move, read) + ctype 1 0 0 0 0 0 (default, delete, elision, move, read, rewrite) memtbl 1 256 K zmemtbl 1 256 K ztbl 2 1.5 K @@ -140,7 +140,7 @@ __level_____count____size___score______in__ingest(sz_cnt)____move(sz_cnt)___writ total 1 778 B - 84 B 0 B 0 0 B 0 2.3 K 3 1.5 K 1 28.6 flush 2 compact 1 0 B 0 B 0 (size == estimated-debt, score = in-progress-bytes, in = num-in-progress) - ctype 1 0 0 0 0 (default, delete, elision, move, read) + ctype 1 0 0 0 0 0 (default, delete, elision, move, read, rewrite) memtbl 1 256 K zmemtbl 1 256 K ztbl 1 771 B @@ -173,7 +173,7 @@ __level_____count____size___score______in__ingest(sz_cnt)____move(sz_cnt)___writ total 1 778 B - 84 B 0 B 0 0 B 0 2.3 K 3 1.5 K 1 28.6 flush 2 compact 1 0 B 0 B 0 (size == estimated-debt, score = in-progress-bytes, in = num-in-progress) - ctype 1 0 0 0 0 (default, delete, elision, move, read) + ctype 1 0 0 0 0 0 (default, delete, elision, move, read, rewrite) memtbl 1 256 K zmemtbl 0 0 B ztbl 0 0 B diff --git a/testdata/rangekeys b/testdata/rangekeys index 98eb0bce45..0794e00c7c 100644 --- a/testdata/rangekeys +++ b/testdata/rangekeys @@ -508,24 +508,24 @@ x: valid (., [x-z) @5=boop) x: valid (., [x-z) @5=boop) # Applying range keys to a DB running with a version that doesn't support them -# results in an error. Range keys were added in version 6. -reset format-major-version=5 +# results in an error. Range keys were added in version 7. +reset format-major-version=6 ---- batch range-key-set a z @5 boop ---- -pebble: range keys require at least format major version 6 (current: 5) +pebble: range keys require at least format major version 8 (current: 6) # Constructing iterator over range keys on a DB that doesn't support them # results in an error. -reset format-major-version=5 +reset format-major-version=6 ---- combined-iter ---- -pebble: range keys require at least format major version 6 (current: 5) +pebble: range keys require at least format major version 8 (current: 6) # Test iterator bounds provided via IterOptions. diff --git a/testdata/split_user_key_migration b/testdata/split_user_key_migration new file mode 100644 index 0000000000..78a3ccead0 --- /dev/null +++ b/testdata/split_user_key_migration @@ -0,0 +1,148 @@ +define +L1 +d.SET.110:d e.SET.140:e +---- +1: + 000004:[d#110,SET-e#140,SET] points:[d#110,SET-e#140,SET] + +reopen +---- +OK + +# The current public Pebble interface offers no way of constructing a multi-file +# atomic compaction unit, so use the force-ingest command to force an ingestion +# into L1. + +build ef +set e e +set f f +---- + +force-ingest paths=(ef) level=1 +---- +1: + 000004:[d#110,SET-e#140,SET] points:[d#110,SET-e#140,SET] + 000008:[e#1,SET-f#1,SET] points:[e#1,SET-f#1,SET] + +format-major-version +---- +005 + +marked-file-count +---- +0 files marked for compaction + +ratchet-format-major-version 006 +---- + +format-major-version +---- +006 + +# Upgrading to format major version 006 should've marked files for compaction. + +marked-file-count +---- +2 files marked for compaction + +reopen +---- +OK + +format-major-version +---- +006 + +# Ensure the files previously marked for compaction are still marked for +# compaction. + +marked-file-count +---- +2 files marked for compaction + +# Ratcheting to 007 should force compaction of any files still marked for +# compaction. + +ratchet-format-major-version 007 +---- +[JOB 100] compacted(rewrite) L1 [000004 000008] (1.6 K) + L1 [] (0 B) -> L1 [000013] (786 B), in 1.0s (2.0s total), output rate 786 B/s + +format-major-version +---- +007 + +lsm +---- +1: + 000013:[d#0,SET-f#0,SET] + +# Reset to a new LSM. + +define +L1 +b.SET.0:b c.SET.5:c +L1 +m.SET.0:m l.SET.5:l +L1 +x.SET.0:x y.SET.5:y +---- +1: + 000004:[b#0,SET-c#5,SET] points:[b#0,SET-c#5,SET] + 000005:[l#5,SET-m#0,SET] points:[l#5,SET-m#0,SET] + 000006:[x#0,SET-y#5,SET] points:[x#0,SET-y#5,SET] + +build ab +set a a +set b b +---- + +build cd +set c c +set d d +---- + +build wx +set w w +set x x +---- + +force-ingest paths=(ab, cd, wx) level=1 +---- +1: + 000007:[a#1,SET-b#1,SET] points:[a#1,SET-b#1,SET] + 000004:[b#0,SET-c#5,SET] points:[b#0,SET-c#5,SET] + 000008:[c#2,SET-d#2,SET] points:[c#2,SET-d#2,SET] + 000005:[l#5,SET-m#0,SET] points:[l#5,SET-m#0,SET] + 000009:[w#3,SET-x#3,SET] points:[w#3,SET-x#3,SET] + 000006:[x#0,SET-y#5,SET] points:[x#0,SET-y#5,SET] + +format-major-version +---- +005 + +ratchet-format-major-version 006 +---- + +format-major-version +---- +006 + +marked-file-count +---- +5 files marked for compaction + +ratchet-format-major-version 007 +---- +[JOB 100] compacted(rewrite) L1 [000007 000004 000008] (2.4 K) + L1 [] (0 B) -> L1 [000011] (794 B), in 1.0s (2.0s total), output rate 794 B/s +[JOB 100] compacted(rewrite) L1 [000009 000006] (1.6 K) + L1 [] (0 B) -> L1 [000012] (786 B), in 1.0s (2.0s total), output rate 786 B/s + +lsm +---- +1: + 000011:[a#0,SET-d#0,SET] + 000005:[l#5,SET-m#0,SET] + 000012:[w#0,SET-y#0,SET] + +format-major-version +---- +007 diff --git a/tool/testdata/db_lsm b/tool/testdata/db_lsm index ae4e3cf758..d60f1d8c0d 100644 --- a/tool/testdata/db_lsm +++ b/tool/testdata/db_lsm @@ -22,7 +22,7 @@ __level_____count____size___score______in__ingest(sz_cnt)____move(sz_cnt)___writ total 1 986 B - 0 B 0 B 0 0 B 0 0 B 0 0 B 0 0.0 flush 0 compact 0 0 B 0 B 0 (size == estimated-debt, score = in-progress-bytes, in = num-in-progress) - ctype 0 0 0 0 0 (default, delete, elision, move, read) + ctype 0 0 0 0 0 0 (default, delete, elision, move, read, rewrite) memtbl 1 256 K zmemtbl 0 0 B ztbl 0 0 B diff --git a/version_set.go b/version_set.go index 18b655e28f..bd5091ef9e 100644 --- a/version_set.go +++ b/version_set.go @@ -346,6 +346,7 @@ func (vs *versionSet) logAndApply( jobID int, ve *versionEdit, metrics map[int]*LevelMetrics, + forceRotation bool, inProgressCompactions func() []compactionInfo, ) error { if !vs.writing { @@ -394,7 +395,7 @@ func (vs *versionSet) logAndApply( // is too large. var newManifestFileNum FileNum var prevManifestFileSize uint64 - if vs.manifest == nil || vs.manifest.Size() >= vs.opts.MaxManifestFileSize { + if forceRotation || vs.manifest == nil || vs.manifest.Size() >= vs.opts.MaxManifestFileSize { newManifestFileNum = vs.getNextFileNum() prevManifestFileSize = uint64(vs.manifest.Size()) } @@ -550,6 +551,10 @@ func (vs *versionSet) incrementCompactions(kind compactionKind) { case compactionKindRead: vs.metrics.Compact.Count++ vs.metrics.Compact.ReadCount++ + + case compactionKindRewrite: + vs.metrics.Compact.Count++ + vs.metrics.Compact.RewriteCount++ } }