diff --git a/obsolete_files.go b/obsolete_files.go index 1f69ed5e0c..8852455fa8 100644 --- a/obsolete_files.go +++ b/obsolete_files.go @@ -338,7 +338,7 @@ func (d *DB) onObsoleteTableDelete(fileSize uint64, isLocal bool) { // are not actually deleted by this method. A subsequent call to // deleteObsoleteFiles must be performed. Must be not be called concurrently // with compactions and flushes. db.mu must be held when calling this function. -func (d *DB) scanObsoleteFiles(list []string) { +func (d *DB) scanObsoleteFiles(list []string, flushableIngests []*ingestedFlushable) { // Disable automatic compactions temporarily to avoid concurrent compactions / // flushes from interfering. The original value is restored on completion. disabledPrev := d.opts.DisableAutomaticCompactions @@ -356,14 +356,11 @@ func (d *DB) scanObsoleteFiles(list []string) { d.mu.versions.addLiveFileNums(liveFileNums) // Protect against files which are only referred to by the ingestedFlushable // from being deleted. These are added to the flushable queue on WAL replay - // during read only mode and aren't part of the Version. Note that if - // !d.opts.ReadOnly, then all flushables of type ingestedFlushable have - // already been flushed. - for _, fEntry := range d.mu.mem.queue { - if f, ok := fEntry.flushable.(*ingestedFlushable); ok { - for _, file := range f.files { - liveFileNums[file.FileBacking.DiskFileNum] = struct{}{} - } + // and handle their own obsoletion/deletion. We exclude them from this obsolete + // file scan to avoid double-deleting these files. + for _, f := range flushableIngests { + for _, file := range f.files { + liveFileNums[file.FileBacking.DiskFileNum] = struct{}{} } } diff --git a/open.go b/open.go index 7bf96b1ad1..3670ceaaae 100644 --- a/open.go +++ b/open.go @@ -232,6 +232,11 @@ func Open(dirname string, opts *Options) (db *DB, err error) { // allocated memory resources. Note that rather than look for an error, we // look for the return of a nil DB pointer. if r := recover(); db == nil { + // If there's an unused, recycled memtable, we need to release its memory. + if obsoleteMemTable := d.memTableRecycle.Swap(nil); obsoleteMemTable != nil { + d.freeMemTable(obsoleteMemTable) + } + // Release our references to the Cache. Note that both the DB, and // tableCache have a reference. When we release the reference to // the tableCache, and if there are no other references to @@ -492,8 +497,7 @@ func Open(dirname string, opts *Options) (db *DB, err error) { break } } - var ve versionEdit - var toFlush flushableList + var flushableIngests []*ingestedFlushable for i, lf := range replayWALs { // WALs other than the last one would have been closed cleanly. // @@ -502,41 +506,33 @@ func Open(dirname string, opts *Options) (db *DB, err error) { // 20.1 do not guarantee that closed WALs end cleanly. But the earliest // compatible Pebble format is newer and guarantees a clean EOF. strictWALTail := i < len(replayWALs)-1 - flush, maxSeqNum, err := d.replayWAL(jobID, &ve, lf, strictWALTail) + fi, maxSeqNum, err := d.replayWAL(jobID, lf, strictWALTail) if err != nil { return nil, err } - toFlush = append(toFlush, flush...) + if len(fi) > 0 { + flushableIngests = append(flushableIngests, fi...) + } if d.mu.versions.logSeqNum.Load() < maxSeqNum { d.mu.versions.logSeqNum.Store(maxSeqNum) } } + if d.mu.mem.mutable == nil { + // Recreate the mutable memtable if replayWAL got rid of it. + var entry *flushableEntry + d.mu.mem.mutable, entry = d.newMemTable(d.mu.versions.getNextDiskFileNum(), d.mu.versions.logSeqNum.Load(), 0 /* minSize */) + d.mu.mem.queue = append(d.mu.mem.queue, entry) + } d.mu.versions.visibleSeqNum.Store(d.mu.versions.logSeqNum.Load()) if !d.opts.ReadOnly { - // Create an empty .log file. - newLogNum := d.mu.versions.getNextDiskFileNum() - - // This logic is slightly different than RocksDB's. Specifically, RocksDB - // sets MinUnflushedLogNum to max-recovered-log-num + 1. We set it to the - // newLogNum. There should be no difference in using either value. - ve.MinUnflushedLogNum = newLogNum - - // Create the manifest with the updated MinUnflushedLogNum before - // creating the new log file. If we created the log file first, a - // crash before the manifest is synced could leave two WALs with - // unclean tails. - d.mu.versions.logLock() - if err := d.mu.versions.logAndApply(jobID, &ve, newFileMetrics(ve.NewFiles), false /* forceRotation */, func() []compactionInfo { - return nil - }); err != nil { - return nil, err - } - - for _, entry := range toFlush { - entry.readerUnrefLocked(true) + d.maybeScheduleFlush() + for d.mu.compact.flushing { + d.mu.compact.cond.Wait() } + // Create an empty .log file for the mutable memtable. + newLogNum := d.mu.versions.getNextDiskFileNum() d.mu.log.writer, err = d.mu.log.manager.Create(wal.NumWAL(newLogNum), int(jobID)) if err != nil { return nil, err @@ -603,7 +599,13 @@ func Open(dirname string, opts *Options) (db *DB, err error) { } if !d.opts.ReadOnly { - d.scanObsoleteFiles(ls) + // Get a fresh list of files, in case some of the earlier flushes/compactions + // have deleted some files. + ls, err := opts.FS.List(dirname) + if err != nil { + return nil, err + } + d.scanObsoleteFiles(ls, flushableIngests) d.deleteObsoleteFiles(jobID) } // Else, nothing is obsolete. @@ -760,11 +762,68 @@ func GetVersion(dir string, fs vfs.FS) (string, error) { return version, nil } +func (d *DB) replayIngestedFlushable( + b *Batch, logNum base.DiskFileNum, +) (entry *flushableEntry, err error) { + br := b.Reader() + seqNum := b.SeqNum() + + fileNums := make([]base.DiskFileNum, 0, b.Count()) + addFileNum := func(encodedFileNum []byte) { + fileNum, n := binary.Uvarint(encodedFileNum) + if n <= 0 { + panic("pebble: ingest sstable file num is invalid") + } + fileNums = append(fileNums, base.DiskFileNum(fileNum)) + } + + for i := 0; i < int(b.Count()); i++ { + kind, encodedKey, _, ok, err := br.Next() + if err != nil { + return nil, err + } + if kind != InternalKeyKindIngestSST { + panic("pebble: invalid batch key kind") + } + if !ok { + panic("pebble: invalid batch count") + } + addFileNum(encodedKey) + } + + if _, _, _, ok, err := br.Next(); err != nil { + return nil, err + } else if ok { + panic("pebble: invalid number of entries in batch") + } + + meta := make([]*fileMetadata, len(fileNums)) + for i, n := range fileNums { + readable, err := d.objProvider.OpenForReading(context.TODO(), fileTypeTable, n, objstorage.OpenOptions{MustExist: true}) + if err != nil { + return nil, errors.Wrap(err, "pebble: error when opening flushable ingest files") + } + // NB: ingestLoad1 will close readable. + meta[i], err = ingestLoad1(context.TODO(), d.opts, d.FormatMajorVersion(), readable, d.cacheID, base.PhysicalTableFileNum(n)) + if err != nil { + return nil, errors.Wrap(err, "pebble: error when loading flushable ingest files") + } + } + + numFiles := len(meta) + if uint32(numFiles) != b.Count() { + panic("pebble: couldn't load all files in WAL entry") + } + + return d.newIngestedFlushableEntry(meta, seqNum, logNum, KeyRange{}) +} + // replayWAL replays the edits in the specified WAL. If the DB is in read // only mode, then the WALs are replayed into memtables and not flushed. If // the DB is not in read only mode, then the contents of the WAL are -// guaranteed to be flushed. Note that this flushing is very important for -// guaranteeing durability: the application may have had a number of pending +// guaranteed to be flushed when a flush is scheduled after this method is run. +// Note that this flushing is very important for guaranteeing durability: +// the application may have had a number of pending // fsyncs to the WAL before the process crashed, and those fsyncs may not have // happened but the corresponding data may now be readable from the WAL (while // sitting in write-back caches in the kernel or the storage device). By @@ -772,16 +831,15 @@ func GetVersion(dir string, fs vfs.FS) (string, error) { // these changes (flush does fsyncs), we are able to guarantee that the // initial state of the DB is durable. // -// The toFlush return value is a list of flushables associated with the WAL -// being replayed which will be flushed. Once the version edit has been applied -// to the manifest, it is up to the caller of replayWAL to unreference the -// toFlush flushables returned by replayWAL. +// This method mutates d.mu.mem.queue and possibly d.mu.mem.mutable and replays +// WALs into the flushable queue. Flushing of the queue is expected to be handled +// by callers. A list of flushable ingests (but not memtables) replayed is returned. // // d.mu must be held when calling this, but the mutex may be dropped and // re-acquired during the course of this method. func (d *DB) replayWAL( - jobID JobID, ve *versionEdit, ll wal.LogicalLog, strictWALTail bool, -) (toFlush flushableList, maxSeqNum base.SeqNum, err error) { + jobID JobID, ll wal.LogicalLog, strictWALTail bool, +) (flushableIngests []*ingestedFlushable, maxSeqNum base.SeqNum, err error) { rr := ll.OpenForRead() defer rr.Close() var ( @@ -800,20 +858,16 @@ func (d *DB) replayWAL( // panicking where the error points to Pebble bug and not user or // hardware-induced corruption. - if d.opts.ReadOnly { - // In read-only mode, we replay directly into the mutable memtable which will - // never be flushed. - mem = d.mu.mem.mutable - if mem != nil { - entry = d.mu.mem.queue[len(d.mu.mem.queue)-1] - } - } - - // Flushes the current memtable, if not nil. + // "Flushes" (ie. closes off) the current memtable, if not nil. flushMem := func() { if mem == nil { return } + mem.writerUnref() + if d.mu.mem.mutable == mem { + d.mu.mem.mutable = nil + } + entry.flushForced = !d.opts.ReadOnly var logSize uint64 mergedOffset := offset.Physical + offset.PreviousFilesBytes if mergedOffset >= lastFlushOffset { @@ -823,41 +877,27 @@ func (d *DB) replayWAL( // been empty, but we need to flush it since we don't want to add to it later. lastFlushOffset = mergedOffset entry.logSize = logSize + mem, entry = nil, nil + } + + mem = d.mu.mem.mutable + if mem != nil { + entry = d.mu.mem.queue[len(d.mu.mem.queue)-1] if !d.opts.ReadOnly { - toFlush = append(toFlush, entry) + flushMem() } - mem, entry = nil, nil } + // Creates a new memtable if there is no current memtable. ensureMem := func(seqNum base.SeqNum) { if mem != nil { return } mem, entry = d.newMemTable(base.DiskFileNum(ll.Num), seqNum, 0 /* minSize */) - if d.opts.ReadOnly { - d.mu.mem.mutable = mem - d.mu.mem.queue = append(d.mu.mem.queue, entry) - } + d.mu.mem.mutable = mem + d.mu.mem.queue = append(d.mu.mem.queue, entry) } - // updateVE is used to update ve with information about new files created - // during the flush of any flushable not of type ingestedFlushable. For the - // flushable of type ingestedFlushable we use custom handling below. - updateVE := func() error { - // TODO(bananabrick): See if we can use the actual base level here, - // instead of using 1. - c, err := newFlush(d.opts, d.mu.versions.currentVersion(), - 1 /* base level */, toFlush, d.timeNow()) - if err != nil { - return err - } - newVE, _, err := d.runCompaction(jobID, c) - if err != nil { - return errors.Wrapf(err, "running compaction during WAL replay") - } - ve.NewFiles = append(ve.NewFiles, newVE.NewFiles...) - return nil - } defer func() { if err != nil { err = errors.WithDetailf(err, "replaying wal %d, offset %s", ll.Num, offset) @@ -905,121 +945,26 @@ func (d *DB) replayWAL( batchesReplayed++ { br := b.Reader() - if kind, encodedFileNum, _, ok, err := br.Next(); err != nil { + if kind, _, _, ok, err := br.Next(); err != nil { return nil, 0, err } else if ok && kind == InternalKeyKindIngestSST { - fileNums := make([]base.DiskFileNum, 0, b.Count()) - addFileNum := func(encodedFileNum []byte) { - fileNum, n := binary.Uvarint(encodedFileNum) - if n <= 0 { - panic("pebble: ingest sstable file num is invalid.") - } - fileNums = append(fileNums, base.DiskFileNum(fileNum)) - } - addFileNum(encodedFileNum) - - for i := 1; i < int(b.Count()); i++ { - kind, encodedFileNum, _, ok, err := br.Next() - if err != nil { - return nil, 0, err - } - if kind != InternalKeyKindIngestSST { - panic("pebble: invalid batch key kind.") - } - if !ok { - panic("pebble: invalid batch count.") - } - addFileNum(encodedFileNum) - } - - if _, _, _, ok, err := br.Next(); err != nil { - return nil, 0, err - } else if ok { - panic("pebble: invalid number of entries in batch.") - } - - meta := make([]*fileMetadata, len(fileNums)) - for i, n := range fileNums { - var readable objstorage.Readable - objMeta, err := d.objProvider.Lookup(fileTypeTable, n) - if err != nil { - return nil, 0, errors.Wrap(err, "pebble: error when looking up ingested SSTs") - } - if objMeta.IsRemote() { - readable, err = d.objProvider.OpenForReading(context.TODO(), fileTypeTable, n, objstorage.OpenOptions{MustExist: true}) - if err != nil { - return nil, 0, errors.Wrap(err, "pebble: error when opening flushable ingest files") - } - } else { - path := base.MakeFilepath(d.opts.FS, d.dirname, fileTypeTable, n) - f, err := d.opts.FS.Open(path) - if err != nil { - return nil, 0, err - } - - readable, err = sstable.NewSimpleReadable(f) - if err != nil { - return nil, 0, err - } - } - // NB: ingestLoad1 will close readable. - meta[i], err = ingestLoad1(context.TODO(), d.opts, d.FormatMajorVersion(), readable, d.cacheID, base.PhysicalTableFileNum(n)) - if err != nil { - return nil, 0, errors.Wrap(err, "pebble: error when loading flushable ingest files") - } - } - - if uint32(len(meta)) != b.Count() { - panic("pebble: couldn't load all files in WAL entry.") - } - - entry, err = d.newIngestedFlushableEntry(meta, seqNum, base.DiskFileNum(ll.Num), KeyRange{}) + // We're in the flushable ingests case. + // + // Ingests require an up-to-date view of the LSM to determine the target + // level of ingested sstables, and to accurately compute excises. Instead of + // doing an ingest in this function, we just enqueue a flushable ingest + // in the flushables queue and run a regular flush. + flushMem() + // mem is nil here. + entry, err = d.replayIngestedFlushable(&b, base.DiskFileNum(ll.Num)) if err != nil { return nil, 0, err } - - if d.opts.ReadOnly { - d.mu.mem.queue = append(d.mu.mem.queue, entry) - // We added the IngestSST flushable to the queue. But there - // must be at least one WAL entry waiting to be replayed. We - // have to ensure this newer WAL entry isn't replayed into - // the current value of d.mu.mem.mutable because the current - // mutable memtable exists before this flushable entry in - // the memtable queue. To ensure this, we just need to unset - // d.mu.mem.mutable. When a newer WAL is replayed, we will - // set d.mu.mem.mutable to a newer value. - d.mu.mem.mutable = nil - } else { - toFlush = append(toFlush, entry) - // During WAL replay, the lsm only has L0, hence, the - // baseLevel is 1. For the sake of simplicity, we place the - // ingested files in L0 here, instead of finding their - // target levels. This is a simplification for the sake of - // simpler code. It is expected that WAL replay should be - // rare, and that flushables of type ingestedFlushable - // should also be rare. So, placing the ingested files in L0 - // is alright. - // - // TODO(bananabrick): Maybe refactor this function to allow - // us to easily place ingested files in levels as low as - // possible during WAL replay. It would require breaking up - // the application of ve to the manifest into chunks and is - // not pretty w/o a refactor to this function and how it's - // used. - c, err := newFlush( - d.opts, d.mu.versions.currentVersion(), - 1, /* base level */ - []*flushableEntry{entry}, - d.timeNow(), - ) - if err != nil { - return nil, 0, err - } - for _, file := range c.flushing[0].flushable.(*ingestedFlushable).files { - ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: 0, Meta: file.FileMetadata}) - } - } - return toFlush, maxSeqNum, nil + fi := entry.flushable.(*ingestedFlushable) + flushableIngests = append(flushableIngests, fi) + d.mu.mem.queue = append(d.mu.mem.queue, entry) + // A flushable ingest is always followed by a WAL rotation. + break } } @@ -1036,20 +981,7 @@ func (d *DB) replayWAL( // Disable memory accounting by adding a reader ref that will never be // removed. entry.readerRefs.Add(1) - if d.opts.ReadOnly { - d.mu.mem.queue = append(d.mu.mem.queue, entry) - // We added the flushable batch to the flushable to the queue. - // But there must be at least one WAL entry waiting to be - // replayed. We have to ensure this newer WAL entry isn't - // replayed into the current value of d.mu.mem.mutable because - // the current mutable memtable exists before this flushable - // entry in the memtable queue. To ensure this, we just need to - // unset d.mu.mem.mutable. When a newer WAL is replayed, we will - // set d.mu.mem.mutable to a newer value. - d.mu.mem.mutable = nil - } else { - toFlush = append(toFlush, entry) - } + d.mu.mem.queue = append(d.mu.mem.queue, entry) } else { ensureMem(seqNum) if err = mem.prepare(&b); err != nil && err != arenaskl.ErrArenaFull { @@ -1076,16 +1008,12 @@ func (d *DB) replayWAL( d.opts.Logger.Infof("[JOB %d] WAL %s stopped reading at offset: %s; replayed %d keys in %d batches", jobID, base.DiskFileNum(ll.Num).String(), offset, keysReplayed, batchesReplayed) - flushMem() - - // mem is nil here. - if !d.opts.ReadOnly && batchesReplayed > 0 { - err = updateVE() - if err != nil { - return nil, 0, err - } + if !d.opts.ReadOnly { + flushMem() } - return toFlush, maxSeqNum, err + + // mem is nil here, if !ReadOnly. + return flushableIngests, maxSeqNum, err } func readOptionsFile(opts *Options, path string) (string, error) { diff --git a/open_test.go b/open_test.go index fff9024b4d..fbee9af41e 100644 --- a/open_test.go +++ b/open_test.go @@ -1022,8 +1022,18 @@ func TestCrashOpenCrashAfterWALCreation(t *testing.T) { require.NotNil(t, crashFS) fs = crashFS - if n := len(getLogs()); n != 2 { - t.Fatalf("expected two logs, found %d\n", n) + newLogs := getLogs() + if n := len(newLogs); n > 2 || n < 1 { + t.Fatalf("expected one or two logs, found %d\n", n) + } else if n == 1 { + // On rare occasions, we can race between the cleaner cleaning away the old log + // and d.Close(). If we only see one log, confirm that it has a higher + // lognum than the previous log. + origLogNum, err := strconv.Atoi(strings.Split(logs[0], ".")[0]) + require.NoError(t, err) + curLogNum, err := strconv.Atoi(strings.Split(newLogs[0], ".")[0]) + require.NoError(t, err) + require.Greater(t, curLogNum, origLogNum) } // Finally, open the database with syncs enabled. diff --git a/replay/testdata/corpus/findManifestStart b/replay/testdata/corpus/findManifestStart index f5369f953d..008da2164a 100644 --- a/replay/testdata/corpus/findManifestStart +++ b/replay/testdata/corpus/findManifestStart @@ -76,11 +76,11 @@ list-files build ---- build: 000005.sst - 000006.log + 000008.log LOCK MANIFEST-000001 MANIFEST-000007 - OPTIONS-000008 + OPTIONS-000009 marker.format-version.000001.013 marker.manifest.000002.MANIFEST-000007 diff --git a/replay/testdata/corpus/high_read_amp b/replay/testdata/corpus/high_read_amp index 800511b5ca..1d2b917bb0 100644 --- a/replay/testdata/corpus/high_read_amp +++ b/replay/testdata/corpus/high_read_amp @@ -70,14 +70,14 @@ build: 000005.sst 000006.log 000007.sst - 000009.log - 000010.sst + 000008.log + 000009.sst LOCK - MANIFEST-000008 - MANIFEST-000011 + MANIFEST-000001 + MANIFEST-000010 OPTIONS-000003 marker.format-version.000001.013 - marker.manifest.000003.MANIFEST-000011 + marker.manifest.000002.MANIFEST-000010 start ---- @@ -88,12 +88,12 @@ list-files high_read_amp/checkpoint high_read_amp/checkpoint: 000005.sst 000007.sst - 000009.log - 000010.sst - MANIFEST-000011 + 000008.log + 000009.sst + MANIFEST-000010 OPTIONS-000003 marker.format-version.000001.013 - marker.manifest.000001.MANIFEST-000011 + marker.manifest.000001.MANIFEST-000010 commit set d d @@ -114,6 +114,6 @@ stopped list-files high_read_amp ---- high_read_amp: - 000013.sst - MANIFEST-000011 + 000012.sst + MANIFEST-000010 checkpoint diff --git a/replay/testdata/corpus/simple b/replay/testdata/corpus/simple index 28d6a51dd8..793545090b 100644 --- a/replay/testdata/corpus/simple +++ b/replay/testdata/corpus/simple @@ -72,14 +72,12 @@ list-files simple simple: 000007.sst MANIFEST-000001 - MANIFEST-000008 checkpoint stat simple/MANIFEST-000001 simple/MANIFEST-000008 simple/000007.sst ---- simple/MANIFEST-000001: - size: 98 -simple/MANIFEST-000008: - size: 122 + size: 133 +simple/MANIFEST-000008: stat simple/MANIFEST-000008: file does not exist simple/000007.sst: size: 614 diff --git a/replay/testdata/replay b/replay/testdata/replay index 6fb775dec7..7ba447b567 100644 --- a/replay/testdata/replay +++ b/replay/testdata/replay @@ -10,19 +10,17 @@ tree 49 000006.log 614 000007.sst 0 LOCK - 98 MANIFEST-000001 - 122 MANIFEST-000008 + 133 MANIFEST-000001 1359 OPTIONS-000003 0 marker.format-version.000001.013 - 0 marker.manifest.000002.MANIFEST-000008 + 0 marker.manifest.000001.MANIFEST-000001 simple/ 614 000007.sst - 98 MANIFEST-000001 - 122 MANIFEST-000008 + 133 MANIFEST-000001 checkpoint/ 25 000004.log 586 000005.sst - 98 MANIFEST-000001 + 85 MANIFEST-000001 1359 OPTIONS-000003 0 marker.format-version.000001.013 0 marker.manifest.000001.MANIFEST-000001 diff --git a/replay/testdata/replay_paced b/replay/testdata/replay_paced index fdaa2a96eb..560ab347a2 100644 --- a/replay/testdata/replay_paced +++ b/replay/testdata/replay_paced @@ -7,28 +7,28 @@ tree build/ 864 000005.sst 560 000007.sst - 89 000009.log - 560 000010.sst - 200 000012.log - 614 000013.sst + 89 000008.log + 560 000009.sst + 200 000011.log + 614 000012.sst 0 LOCK - 122 MANIFEST-000008 - 205 MANIFEST-000011 + 133 MANIFEST-000001 + 205 MANIFEST-000010 1359 OPTIONS-000003 0 marker.format-version.000001.013 - 0 marker.manifest.000003.MANIFEST-000011 + 0 marker.manifest.000002.MANIFEST-000010 high_read_amp/ - 614 000013.sst - 205 MANIFEST-000011 + 614 000012.sst + 205 MANIFEST-000010 checkpoint/ 864 000005.sst 560 000007.sst - 39 000009.log - 560 000010.sst - 157 MANIFEST-000011 + 39 000008.log + 560 000009.sst + 157 MANIFEST-000010 1359 OPTIONS-000003 0 marker.format-version.000001.013 - 0 marker.manifest.000001.MANIFEST-000011 + 0 marker.manifest.000001.MANIFEST-000010 replay high_read_amp fixed 1 ---- diff --git a/testdata/checkpoint b/testdata/checkpoint index c323f8c0cd..d69dba834c 100644 --- a/testdata/checkpoint +++ b/testdata/checkpoint @@ -18,7 +18,6 @@ create: db/marker.manifest.000001.MANIFEST-000001 close: db/marker.manifest.000001.MANIFEST-000001 sync: db open-dir: db -sync: db/MANIFEST-000001 create: db/000002.log sync: db create: db/marker.format-version.000001.014 @@ -644,25 +643,25 @@ open-dir: checkpoints/checkpoint5 open: checkpoints/checkpoint5/OPTIONS-000003 close: checkpoints/checkpoint5/OPTIONS-000003 open: checkpoints/checkpoint5/000008.log -create: checkpoints/checkpoint5/000017.sst -sync-data: checkpoints/checkpoint5/000017.sst -close: checkpoints/checkpoint5/000017.sst -sync: checkpoints/checkpoint5 close: checkpoints/checkpoint5/000008.log +create: checkpoints/checkpoint5/000018.sst +sync-data: checkpoints/checkpoint5/000018.sst +close: checkpoints/checkpoint5/000018.sst +sync: checkpoints/checkpoint5 create: checkpoints/checkpoint5/MANIFEST-000019 sync: checkpoints/checkpoint5/MANIFEST-000019 create: checkpoints/checkpoint5/marker.manifest.000002.MANIFEST-000019 close: checkpoints/checkpoint5/marker.manifest.000002.MANIFEST-000019 remove: checkpoints/checkpoint5/marker.manifest.000001.MANIFEST-000001 sync: checkpoints/checkpoint5 -create: checkpoints/checkpoint5/000018.log +remove: checkpoints/checkpoint5/000008.log +create: checkpoints/checkpoint5/000020.log sync: checkpoints/checkpoint5 -create: checkpoints/checkpoint5/temporary.000020.dbtmp -sync: checkpoints/checkpoint5/temporary.000020.dbtmp -close: checkpoints/checkpoint5/temporary.000020.dbtmp -rename: checkpoints/checkpoint5/temporary.000020.dbtmp -> checkpoints/checkpoint5/OPTIONS-000020 +create: checkpoints/checkpoint5/temporary.000021.dbtmp +sync: checkpoints/checkpoint5/temporary.000021.dbtmp +close: checkpoints/checkpoint5/temporary.000021.dbtmp +rename: checkpoints/checkpoint5/temporary.000021.dbtmp -> checkpoints/checkpoint5/OPTIONS-000021 sync: checkpoints/checkpoint5 -remove: checkpoints/checkpoint5/000008.log remove: checkpoints/checkpoint5/OPTIONS-000003 print-backing checkpoints/checkpoint5 @@ -674,7 +673,7 @@ print-backing checkpoints/checkpoint5 lsm checkpoints/checkpoint5 ---- L0.0: - 000017:[h#18,SET-h#18,SET] + 000018:[h#18,SET-h#18,SET] L6: 000013(000010):[d#0,SET-g#0,SET] 000015(000011):[i#20,SET-i#20,SET] @@ -741,25 +740,25 @@ open-dir: checkpoints/checkpoint6 open: checkpoints/checkpoint6/OPTIONS-000003 close: checkpoints/checkpoint6/OPTIONS-000003 open: checkpoints/checkpoint6/000008.log -create: checkpoints/checkpoint6/000017.sst -sync-data: checkpoints/checkpoint6/000017.sst -close: checkpoints/checkpoint6/000017.sst -sync: checkpoints/checkpoint6 close: checkpoints/checkpoint6/000008.log +create: checkpoints/checkpoint6/000018.sst +sync-data: checkpoints/checkpoint6/000018.sst +close: checkpoints/checkpoint6/000018.sst +sync: checkpoints/checkpoint6 create: checkpoints/checkpoint6/MANIFEST-000019 sync: checkpoints/checkpoint6/MANIFEST-000019 create: checkpoints/checkpoint6/marker.manifest.000002.MANIFEST-000019 close: checkpoints/checkpoint6/marker.manifest.000002.MANIFEST-000019 remove: checkpoints/checkpoint6/marker.manifest.000001.MANIFEST-000001 sync: checkpoints/checkpoint6 -create: checkpoints/checkpoint6/000018.log +remove: checkpoints/checkpoint6/000008.log +create: checkpoints/checkpoint6/000020.log sync: checkpoints/checkpoint6 -create: checkpoints/checkpoint6/temporary.000020.dbtmp -sync: checkpoints/checkpoint6/temporary.000020.dbtmp -close: checkpoints/checkpoint6/temporary.000020.dbtmp -rename: checkpoints/checkpoint6/temporary.000020.dbtmp -> checkpoints/checkpoint6/OPTIONS-000020 +create: checkpoints/checkpoint6/temporary.000021.dbtmp +sync: checkpoints/checkpoint6/temporary.000021.dbtmp +close: checkpoints/checkpoint6/temporary.000021.dbtmp +rename: checkpoints/checkpoint6/temporary.000021.dbtmp -> checkpoints/checkpoint6/OPTIONS-000021 sync: checkpoints/checkpoint6 -remove: checkpoints/checkpoint6/000008.log remove: checkpoints/checkpoint6/OPTIONS-000003 print-backing checkpoints/checkpoint6 @@ -769,7 +768,7 @@ print-backing checkpoints/checkpoint6 lsm checkpoints/checkpoint6 ---- L0.0: - 000017:[h#18,SET-h#18,SET] + 000018:[h#18,SET-h#18,SET] L6: 000015(000011):[i#20,SET-i#20,SET] 000016(000011):[k#20,SET-k#20,SET] diff --git a/testdata/checkpoint_shared b/testdata/checkpoint_shared index fd264a5e7f..af0a597441 100644 --- a/testdata/checkpoint_shared +++ b/testdata/checkpoint_shared @@ -18,7 +18,6 @@ create: db/marker.manifest.000001.MANIFEST-000001 close: db/marker.manifest.000001.MANIFEST-000001 sync: db open-dir: db -sync: db/MANIFEST-000001 create: db/000002.log sync: db create: db/marker.format-version.000001.017 diff --git a/testdata/cleaner b/testdata/cleaner index b2219b053d..0351b989a7 100644 --- a/testdata/cleaner +++ b/testdata/cleaner @@ -24,7 +24,6 @@ create: db/marker.manifest.000001.MANIFEST-000001 close: db/marker.manifest.000001.MANIFEST-000001 sync: db open-dir: db_wal -sync: db/MANIFEST-000001 create: db_wal/000002.log sync: db_wal create: db/marker.format-version.000001.013 @@ -154,7 +153,6 @@ create: db1/marker.manifest.000001.MANIFEST-000001 close: db1/marker.manifest.000001.MANIFEST-000001 sync: db1 open-dir: db1_wal -sync: db1/MANIFEST-000001 create: db1_wal/000002.log sync: db1_wal create: db1/marker.format-version.000001.013 @@ -240,15 +238,15 @@ create: db1/marker.manifest.000002.MANIFEST-000458 close: db1/marker.manifest.000002.MANIFEST-000458 remove: db1/marker.manifest.000001.MANIFEST-000001 sync: db1 -create: db1_wal/000457.log -sync: db1_wal -create: db1/temporary.000459.dbtmp -sync: db1/temporary.000459.dbtmp -close: db1/temporary.000459.dbtmp -rename: db1/temporary.000459.dbtmp -> db1/OPTIONS-000459 -sync: db1 remove: db1_wal/000002.log remove: db1_wal/000004.log +create: db1_wal/000459.log +sync: db1_wal +create: db1/temporary.000460.dbtmp +sync: db1/temporary.000460.dbtmp +close: db1/temporary.000460.dbtmp +rename: db1/temporary.000460.dbtmp -> db1/OPTIONS-000460 +sync: db1 remove: db1/000123.sst remove: db1/000456.sst remove: db1/OPTIONS-000003 @@ -259,6 +257,6 @@ list db1 LOCK MANIFEST-000001 MANIFEST-000458 -OPTIONS-000459 +OPTIONS-000460 marker.format-version.000001.013 marker.manifest.000002.MANIFEST-000458 diff --git a/testdata/event_listener b/testdata/event_listener index ec7d1f325c..77c34a2a27 100644 --- a/testdata/event_listener +++ b/testdata/event_listener @@ -24,7 +24,6 @@ close: db/marker.manifest.000001.MANIFEST-000001 sync: db [JOB 1] MANIFEST created 000001 open-dir: wal -sync: db/MANIFEST-000001 create: wal/000002.log sync: wal [JOB 1] WAL created 000002 diff --git a/testdata/flushable_ingest b/testdata/flushable_ingest index c5806832d0..cf5d2d3596 100644 --- a/testdata/flushable_ingest +++ b/testdata/flushable_ingest @@ -93,7 +93,8 @@ lsm L0.1: 000004:[a#11,SET-a#11,SET] L0.0: - 000009:[a#10,SET-a#10,SET] + 000010:[a#10,SET-a#10,SET] +L6: 000005:[b#12,SET-b#12,SET] 000006:[d#13,SET-d#13,SET] @@ -421,10 +422,11 @@ lsm L0.1: 000004:[a#11,SET-a#11,SET] L0.0: - 000009:[a#10,SET-a#10,SET] + 000010:[a#10,SET-a#10,SET] + 000012:[f#14,SET-f#14,SET] +L6: 000005:[b#12,SET-b#12,SET] 000006:[d#13,SET-d#13,SET] - 000010:[f#14,SET-f#14,SET] # Check if the new mutable memtable is using a new log file, and that the # previous log files have been deleted appropriately after the flush. @@ -433,16 +435,16 @@ ls 000004.sst 000005.sst 000006.sst -000009.sst 000010.sst -000011.log +000012.sst +000013.log LOCK MANIFEST-000001 -MANIFEST-000012 -OPTIONS-000013 +MANIFEST-000011 +OPTIONS-000014 ext marker.format-version.000004.017 -marker.manifest.000002.MANIFEST-000012 +marker.manifest.000002.MANIFEST-000011 # Make sure that the new mutable memtable can accept writes. batch diff --git a/testdata/iter_histories/errors b/testdata/iter_histories/errors index 16cd5ef2d5..9b0bbdf4b2 100644 --- a/testdata/iter_histories/errors +++ b/testdata/iter_histories/errors @@ -51,7 +51,7 @@ next next next ---- -err=pebble: backing file 000004 error: injected error +err=pebble: backing file 000005 error: injected error a: (a, .) b: (b, .) c: (c, .) diff --git a/testdata/metrics b/testdata/metrics index d067dd86e7..cfe9215e62 100644 --- a/testdata/metrics +++ b/testdata/metrics @@ -962,10 +962,10 @@ level | tables size val-bl vtables | score | in | tables size | tables siz total | 3 1.7KB 0B 0 | - | 0B | 0 0B | 0 0B | 0 0B | 0B | 3 0.0 ------------------------------------------------------------------------------------------------------------------- WAL: 1 files (0B) in: 0B written: 0B (0% overhead) -Flushes: 0 +Flushes: 1 Compactions: 0 estimated debt: 1.7KB in progress: 0 (0B) default: 0 delete: 0 elision: 0 move: 0 read: 0 tombstone-density: 0 rewrite: 0 copy: 0 multi-level: 0 -MemTables: 1 (256KB) zombie: 0 (0B) +MemTables: 1 (512KB) zombie: 1 (256KB) Zombie tables: 0 (0B, local: 0B) Backing tables: 0 (0B) Virtual tables: 0 (0B) @@ -983,7 +983,7 @@ Cgo memory usage: 0B block cache: 0B (data: 0B, maps: 0B, entries: 0B) memtabl compact a-z ---- L6: - 000013:[a#0,SET-c#0,SET] + 000014:[a#0,SET-c#0,SET] # All tables are local after compaction. metrics zero-cache-hits-misses @@ -1001,10 +1001,10 @@ level | tables size val-bl vtables | score | in | tables size | tables siz total | 1 603B 0B 0 | - | 0B | 0 0B | 0 0B | 1 603B | 1.7KB | 1 0.0 ------------------------------------------------------------------------------------------------------------------- WAL: 1 files (0B) in: 0B written: 0B (0% overhead) -Flushes: 0 +Flushes: 1 Compactions: 1 estimated debt: 0B in progress: 0 (0B) default: 1 delete: 0 elision: 0 move: 0 read: 0 tombstone-density: 0 rewrite: 0 copy: 0 multi-level: 0 -MemTables: 1 (256KB) zombie: 0 (0B) +MemTables: 1 (512KB) zombie: 1 (256KB) Zombie tables: 0 (0B, local: 0B) Backing tables: 0 (0B) Virtual tables: 0 (0B) diff --git a/testdata/open_wal_failover b/testdata/open_wal_failover index 8d3a3c4992..fdff52b1d6 100644 --- a/testdata/open_wal_failover +++ b/testdata/open_wal_failover @@ -33,17 +33,17 @@ list path=(b,) list path=(a,data) ---- - 000004.log + 000006.log LOCK MANIFEST-000001 MANIFEST-000005 - OPTIONS-000006 + OPTIONS-000007 marker.format-version.000001.013 marker.manifest.000002.MANIFEST-000005 # The new OPTIONS file should declare the secondary WAL path. -grep-between path=(a,data/OPTIONS-000006) start=(\[WAL Failover\]) end=^$ +grep-between path=(a,data/OPTIONS-000007) start=(\[WAL Failover\]) end=^$ ---- secondary_dir=secondary-wals primary_dir_probe_interval=1s diff --git a/testdata/table_stats b/testdata/table_stats index ba1bbc4eea..46dfdb4268 100644 --- a/testdata/table_stats +++ b/testdata/table_stats @@ -51,7 +51,7 @@ reopen wait-loaded-initial ---- -[JOB 2] all initial table stats loaded +[JOB 3] all initial table stats loaded wait-pending-table-stats 000007 @@ -81,18 +81,18 @@ set b 2 flush ---- L0.0: - 000012:[a#14,SET-b#15,SET] + 000013:[a#14,SET-b#15,SET] compact a-c ---- L6: - 000012:[a#14,SET-b#15,SET] + 000013:[a#14,SET-b#15,SET] enable ---- wait-pending-table-stats -000012 +000013 ---- num-entries: 2 num-deletions: 0 @@ -113,9 +113,9 @@ del-range a c flush ---- L0.0: - 000014:[a#16,RANGEDEL-c#inf,RANGEDEL] + 000015:[a#16,RANGEDEL-c#inf,RANGEDEL] L6: - 000012:[a#14,SET-b#15,SET] + 000013:[a#14,SET-b#15,SET] compact a-c ---- @@ -124,7 +124,7 @@ enable ---- wait-pending-table-stats -000014 +000015 ---- (not found) diff --git a/version_set.go b/version_set.go index 2762aed001..7ed726110b 100644 --- a/version_set.go +++ b/version_set.go @@ -170,9 +170,12 @@ func (vs *versionSet) create( mu *sync.Mutex, ) error { vs.init(dirname, provider, opts, marker, getFormatMajorVersion, mu) - newVersion := &version{} + var bve bulkVersionEdit + newVersion, err := bve.Apply(nil /* curr */, opts.Comparer, opts.FlushSplitBytes, opts.Experimental.ReadCompactionRate) + if err != nil { + return err + } vs.append(newVersion) - var err error vs.picker = newCompactionPickerByScore(newVersion, &vs.virtualBackings, vs.opts, nil) // Note that a "snapshot" version edit is written to the manifest when it is