diff --git a/compaction.go b/compaction.go index 03ab5dec7b4..21dd75ca865 100644 --- a/compaction.go +++ b/compaction.go @@ -1447,17 +1447,68 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) { var err error var fileToSplit *fileMetadata var ingestSplitFiles []ingestSplitFile - for _, file := range c.flushing[0].flushable.(*ingestedFlushable).files { - suggestSplit := d.opts.Experimental.IngestSplit != nil && d.opts.Experimental.IngestSplit() && - d.FormatMajorVersion() >= FormatVirtualSSTables - level, fileToSplit, err = ingestTargetLevel( - d.newIters, d.tableNewRangeKeyIter, iterOpts, d.opts.Comparer, - c.version, baseLevel, d.mu.compact.inProgress, file.FileMetadata, - suggestSplit, - ) + ingestFlushable := c.flushing[0].flushable.(*ingestedFlushable) + + updateLevelMetricsOnExcise := func(m *fileMetadata, level int, added []newFileEntry) { + levelMetrics := c.metrics[level] + if levelMetrics == nil { + levelMetrics = &LevelMetrics{} + c.metrics[level] = levelMetrics + } + levelMetrics.NumFiles-- + levelMetrics.Size -= int64(m.Size) + for i := range added { + levelMetrics.NumFiles++ + levelMetrics.Size += int64(added[i].Meta.Size) + } + } + + suggestSplit := d.opts.Experimental.IngestSplit != nil && d.opts.Experimental.IngestSplit() && + d.FormatMajorVersion() >= FormatVirtualSSTables + + replacedFiles := make(map[base.FileNum][]newFileEntry) + for _, file := range ingestFlushable.files { + // This file fits perfectly within the excise span, so we can slot it at L6. + if ingestFlushable.exciseSpan.Valid() && ingestFlushable.exciseSpan.Contains(d.cmp, file.FileMetadata.Smallest) && ingestFlushable.exciseSpan.Contains(d.cmp, file.FileMetadata.Largest) { + level = 6 + } else { + level, fileToSplit, err = ingestTargetLevel( + d.newIters, d.tableNewRangeKeyIter, iterOpts, d.opts.Comparer, + c.version, baseLevel, d.mu.compact.inProgress, file.FileMetadata, + suggestSplit, + ) + } + + if ingestFlushable.exciseSpan.Valid() { + ve.DeletedFiles = map[manifest.DeletedFileEntry]*manifest.FileMetadata{} + // Iterate through all levels and find files that intersect with exciseSpan. + for level = range c.version.Levels { + overlaps := c.version.Overlaps(level, ingestFlushable.exciseSpan.Start, ingestFlushable.exciseSpan.End, true /* exclusiveEnd */) + iter := overlaps.Iter() + + for m := iter.First(); m != nil; m = iter.Next() { + newFiles, err := d.excise(ingestFlushable.exciseSpan, m, ve, level) + if err != nil { + return nil, err + } + + if _, ok := ve.DeletedFiles[deletedFileEntry{ + Level: level, + FileNum: m.FileNum, + }]; !ok { + // We did not excise this file. + continue + } + replacedFiles[m.FileNum] = newFiles + updateLevelMetricsOnExcise(m, level, newFiles) + } + } + } if err != nil { return nil, err } + + // Add the current flushableIngest file to the version. ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: level, Meta: file.FileMetadata}) if fileToSplit != nil { ingestSplitFiles = append(ingestSplitFiles, ingestSplitFile{ @@ -1475,23 +1526,7 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) { levelMetrics.TablesIngested++ } - updateLevelMetricsOnExcise := func(m *fileMetadata, level int, added []newFileEntry) { - levelMetrics := c.metrics[level] - if levelMetrics == nil { - levelMetrics = &LevelMetrics{} - c.metrics[level] = levelMetrics - } - levelMetrics.NumFiles-- - levelMetrics.Size -= int64(m.Size) - for i := range added { - levelMetrics.NumFiles++ - levelMetrics.Size += int64(added[i].Meta.Size) - } - } - if len(ingestSplitFiles) > 0 { - ve.DeletedFiles = make(map[manifest.DeletedFileEntry]*manifest.FileMetadata) - replacedFiles := make(map[base.FileNum][]newFileEntry) if err := d.ingestSplit(ve, updateLevelMetricsOnExcise, ingestSplitFiles, replacedFiles); err != nil { return nil, err } diff --git a/data_test.go b/data_test.go index 3e5933fe0c4..a38817d72de 100644 --- a/data_test.go +++ b/data_test.go @@ -1250,6 +1250,7 @@ func (d *DB) waitTableStats() { func runIngestAndExciseCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error { var exciseSpan KeyRange + var doFlushableIngest bool paths := make([]string, 0, len(td.CmdArgs)) for i, arg := range td.CmdArgs { switch td.CmdArgs[i].Key { @@ -1263,12 +1264,14 @@ func runIngestAndExciseCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error { } exciseSpan.Start = []byte(fields[0]) exciseSpan.End = []byte(fields[1]) + case "flushable-ingest": + doFlushableIngest = true default: paths = append(paths, arg.String()) } } - if _, err := d.IngestAndExcise(paths, nil /* shared */, nil /* external */, exciseSpan); err != nil { + if _, err := d.IngestAndExcise(paths, nil, nil, exciseSpan, doFlushableIngest); err != nil { return err } return nil diff --git a/flushable.go b/flushable.go index 630191f7375..58de177717c 100644 --- a/flushable.go +++ b/flushable.go @@ -158,6 +158,9 @@ type ingestedFlushable struct { slice manifest.LevelSlice // hasRangeKeys is set on ingestedFlushable construction. hasRangeKeys bool + // exciseSpan is populated if an excise operation should be performed during + // flush. + exciseSpan KeyRange } func newIngestedFlushable( @@ -165,6 +168,7 @@ func newIngestedFlushable( comparer *Comparer, newIters tableNewIters, newRangeKeyIters keyspanimpl.TableNewSpanIter, + exciseSpan KeyRange, ) *ingestedFlushable { var physicalFiles []physicalMeta var hasRangeKeys bool @@ -183,6 +187,7 @@ func newIngestedFlushable( // slice is immutable and can be set once and used many times. slice: manifest.NewLevelSliceKeySorted(comparer.Compare, files), hasRangeKeys: hasRangeKeys, + exciseSpan: exciseSpan, } return ret diff --git a/flushable_test.go b/flushable_test.go index cceb17745fd..4ca38e44ccf 100644 --- a/flushable_test.go +++ b/flushable_test.go @@ -119,9 +119,7 @@ func TestIngestedSSTFlushableAPI(t *testing.T) { } meta := loadFileMeta(paths) - flushable = newIngestedFlushable( - meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter, - ) + flushable = newIngestedFlushable(meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter, KeyRange{}) return "" case "iter": iter := flushable.newIter(nil) diff --git a/ingest.go b/ingest.go index f4fe59f1669..b790aa7146b 100644 --- a/ingest.go +++ b/ingest.go @@ -1101,7 +1101,7 @@ func (d *DB) Ingest(paths []string) error { if d.opts.ReadOnly { return ErrReadOnly } - _, err := d.ingest(paths, ingestTargetLevel, nil /* shared */, KeyRange{}, nil /* external */) + _, err := d.ingest(paths, ingestTargetLevel, nil, KeyRange{}, false, nil) return err } @@ -1188,7 +1188,7 @@ func (d *DB) IngestWithStats(paths []string) (IngestOperationStats, error) { if d.opts.ReadOnly { return IngestOperationStats{}, ErrReadOnly } - return d.ingest(paths, ingestTargetLevel, nil /* shared */, KeyRange{}, nil /* external */) + return d.ingest(paths, ingestTargetLevel, nil, KeyRange{}, false, nil) } // IngestExternalFiles does the same as IngestWithStats, and additionally @@ -1206,7 +1206,7 @@ func (d *DB) IngestExternalFiles(external []ExternalFile) (IngestOperationStats, if d.opts.Experimental.RemoteStorage == nil { return IngestOperationStats{}, errors.New("pebble: cannot ingest external files without shared storage configured") } - return d.ingest(nil, ingestTargetLevel, nil /* shared */, KeyRange{}, external) + return d.ingest(nil, ingestTargetLevel, nil, KeyRange{}, false, external) } // IngestAndExcise does the same as IngestWithStats, and additionally accepts a @@ -1220,7 +1220,11 @@ func (d *DB) IngestExternalFiles(external []ExternalFile) (IngestOperationStats, // Panics if this DB instance was not instantiated with a remote.Storage and // shared sstables are present. func (d *DB) IngestAndExcise( - paths []string, shared []SharedSSTMeta, external []ExternalFile, exciseSpan KeyRange, + paths []string, + shared []SharedSSTMeta, + external []ExternalFile, + exciseSpan KeyRange, + doFlushableIngest bool, ) (IngestOperationStats, error) { if err := d.closed.Load(); err != nil { panic(err) @@ -1243,12 +1247,12 @@ func (d *DB) IngestAndExcise( v, FormatMinForSharedObjects, ) } - return d.ingest(paths, ingestTargetLevel, shared, exciseSpan, external) + return d.ingest(paths, ingestTargetLevel, shared, exciseSpan, doFlushableIngest, external) } // Both DB.mu and commitPipeline.mu must be held while this is called. func (d *DB) newIngestedFlushableEntry( - meta []*fileMetadata, seqNum uint64, logNum base.DiskFileNum, + meta []*fileMetadata, seqNum uint64, logNum base.DiskFileNum, exciseSpan KeyRange, ) (*flushableEntry, error) { // Update the sequence number for all of the sstables in the // metadata. Writing the metadata to the manifest when the @@ -1264,7 +1268,7 @@ func (d *DB) newIngestedFlushableEntry( } } - f := newIngestedFlushable(meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter) + f := newIngestedFlushable(meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter, exciseSpan) // NB: The logNum/seqNum are the WAL number which we're writing this entry // to and the sequence number within the WAL which we'll write this entry @@ -1294,7 +1298,9 @@ func (d *DB) newIngestedFlushableEntry( // we're holding both locks, the order in which we rotate the memtable or // recycle the WAL in this function is irrelevant as long as the correct log // numbers are assigned to the appropriate flushable. -func (d *DB) handleIngestAsFlushable(meta []*fileMetadata, seqNum uint64) error { +func (d *DB) handleIngestAsFlushable( + meta []*fileMetadata, seqNum uint64, exciseSpan KeyRange, +) error { b := d.NewBatch() for _, m := range meta { b.ingestSST(m.FileNum) @@ -1320,7 +1326,7 @@ func (d *DB) handleIngestAsFlushable(meta []*fileMetadata, seqNum uint64) error d.mu.Lock() } - entry, err := d.newIngestedFlushableEntry(meta, seqNum, logNum) + entry, err := d.newIngestedFlushableEntry(meta, seqNum, logNum, exciseSpan) if err != nil { return err } @@ -1340,6 +1346,7 @@ func (d *DB) handleIngestAsFlushable(meta []*fileMetadata, seqNum uint64) error } } + d.mu.versions.metrics.Ingest.Count++ currMem := d.mu.mem.mutable // NB: Placing ingested sstables above the current memtables // requires rotating of the existing memtables/WAL. There is @@ -1349,6 +1356,7 @@ func (d *DB) handleIngestAsFlushable(meta []*fileMetadata, seqNum uint64) error d.mu.mem.queue = append(d.mu.mem.queue, entry) d.rotateMemtable(newLogNum, nextSeqNum, currMem) d.updateReadStateLocked(d.opts.DebugCheck) + // TODO(aaditya): is this necessary? we call this already in rotateMemtable above d.maybeScheduleFlush() return nil } @@ -1359,6 +1367,7 @@ func (d *DB) ingest( targetLevelFunc ingestTargetLevelFunc, shared []SharedSSTMeta, exciseSpan KeyRange, + doFlushableIngest bool, external []ExternalFile, ) (IngestOperationStats, error) { if len(shared) > 0 && d.opts.Experimental.RemoteStorage == nil { @@ -1553,11 +1562,19 @@ func (d *DB) ingest( mut.writerRef() return } - // The ingestion overlaps with some entry in the flushable queue. - if d.FormatMajorVersion() < FormatFlushableIngest || - d.opts.Experimental.DisableIngestAsFlushable() || - len(shared) > 0 || exciseSpan.Valid() || len(external) > 0 || - (len(d.mu.mem.queue) > d.opts.MemTableStopWritesThreshold-1) { + + // The ingestion overlaps with some entry in the flushable queue. If the + // pre-conditions are met below, we can treat this ingestion as a flushable + // ingest, otherwise we wait on the memtable flush before ingestion. + // + // TODO(aaditya): We should make flushableIngest compatible with remote + // files. + hasRemoteFiles := len(shared) > 0 || len(external) > 0 + canIngestFlushable := d.FormatMajorVersion() >= FormatFlushableIngest && + (len(d.mu.mem.queue) < d.opts.MemTableStopWritesThreshold) && + !d.opts.Experimental.DisableIngestAsFlushable() && !hasRemoteFiles + + if !canIngestFlushable || (exciseSpan.Valid() && !doFlushableIngest) { // We're not able to ingest as a flushable, // so we must synchronously flush. // @@ -1589,7 +1606,7 @@ func (d *DB) ingest( for i := range fileMetas { fileMetas[i] = loadResult.local[i].fileMetadata } - err = d.handleIngestAsFlushable(fileMetas, seqNum) + err = d.handleIngestAsFlushable(fileMetas, seqNum, exciseSpan) } var ve *versionEdit diff --git a/ingest_test.go b/ingest_test.go index acc29b7f066..f4babe2fc13 100644 --- a/ingest_test.go +++ b/ingest_test.go @@ -696,6 +696,9 @@ func TestExcise(t *testing.T) { case "ingest-and-excise": flushed = false + d.mu.Lock() + prevFlushableIngests := d.mu.versions.metrics.Flush.AsIngestCount + d.mu.Unlock() if err := runIngestAndExciseCmd(td, d, mem); err != nil { return err.Error() } @@ -704,8 +707,12 @@ func TestExcise(t *testing.T) { for d.mu.compact.flushing { d.mu.compact.cond.Wait() } + flushableIngests := d.mu.versions.metrics.Flush.AsIngestCount d.mu.Unlock() if flushed { + if prevFlushableIngests < flushableIngests { + return "flushable ingest" + } return "memtable flushed" } return "" @@ -1090,7 +1097,7 @@ func testIngestSharedImpl( require.NoError(t, err) require.NoError(t, w.Close()) - _, err = to.IngestAndExcise([]string{sstPath}, sharedSSTs, nil /* external */, KeyRange{Start: startKey, End: endKey}) + _, err = to.IngestAndExcise([]string{sstPath}, sharedSSTs, nil, KeyRange{Start: startKey, End: endKey}, false) require.NoError(t, err) return fmt.Sprintf("replicated %d shared SSTs", len(sharedSSTs)) @@ -1318,7 +1325,7 @@ func TestSimpleIngestShared(t *testing.T) { Level: 6, Size: uint64(size + 5), } - _, err = d.IngestAndExcise([]string{}, []SharedSSTMeta{sharedSSTMeta}, nil /* external */, KeyRange{Start: []byte("d"), End: []byte("ee")}) + _, err = d.IngestAndExcise([]string{}, []SharedSSTMeta{sharedSSTMeta}, nil, KeyRange{Start: []byte("d"), End: []byte("ee")}, false) require.NoError(t, err) // TODO(bilal): Once reading of shared sstables is in, verify that the values @@ -1579,7 +1586,7 @@ func TestConcurrentExcise(t *testing.T) { require.NoError(t, err) require.NoError(t, w.Close()) - _, err = to.IngestAndExcise([]string{sstPath}, sharedSSTs, nil /* external */, KeyRange{Start: startKey, End: endKey}) + _, err = to.IngestAndExcise([]string{sstPath}, sharedSSTs, nil, KeyRange{Start: startKey, End: endKey}, false) require.NoError(t, err) return fmt.Sprintf("replicated %d shared SSTs", len(sharedSSTs)) @@ -1980,7 +1987,7 @@ func TestIngestExternal(t *testing.T) { ) require.NoError(t, err) require.NoError(t, w.Close()) - _, err = to.IngestAndExcise([]string{sstPath}, nil /* sharedSSTs */, externalFiles, KeyRange{Start: startKey, End: endKey}) + _, err = to.IngestAndExcise([]string{sstPath}, nil, externalFiles, KeyRange{Start: startKey, End: endKey}, false) require.NoError(t, err) return fmt.Sprintf("replicated %d external SSTs", len(externalFiles)) diff --git a/metamorphic/ops.go b/metamorphic/ops.go index 49ca80feb40..0922c5f823d 100644 --- a/metamorphic/ops.go +++ b/metamorphic/ops.go @@ -875,10 +875,10 @@ func (o *ingestAndExciseOp) run(t *Test, h historyRecorder) { if t.testOpts.useExcise { err = firstError(err, t.withRetries(func() error { - _, err := t.getDB(o.dbID).IngestAndExcise([]string{path}, nil /* sharedSSTs */, nil /* external */, pebble.KeyRange{ + _, err := t.getDB(o.dbID).IngestAndExcise([]string{path}, nil, nil, pebble.KeyRange{ Start: o.exciseStart, End: o.exciseEnd, - }) + }, false) return err })) } else { @@ -1945,7 +1945,7 @@ func (r *replicateOp) runSharedReplicate( return } - _, err = dest.IngestAndExcise([]string{sstPath}, sharedSSTs, nil /* external */, pebble.KeyRange{Start: r.start, End: r.end}) + _, err = dest.IngestAndExcise([]string{sstPath}, sharedSSTs, nil, pebble.KeyRange{Start: r.start, End: r.end}, false) h.Recordf("%s // %v", r, err) } diff --git a/open.go b/open.go index 0217be606f8..7e8b172d936 100644 --- a/open.go +++ b/open.go @@ -933,9 +933,7 @@ func (d *DB) replayWAL( panic("pebble: couldn't load all files in WAL entry.") } - entry, err = d.newIngestedFlushableEntry( - meta, seqNum, base.DiskFileNum(ll.Num), - ) + entry, err = d.newIngestedFlushableEntry(meta, seqNum, base.DiskFileNum(ll.Num), KeyRange{}) if err != nil { return nil, 0, err } diff --git a/testdata/excise b/testdata/excise index 0a1a9515e05..7cd35eaa037 100644 --- a/testdata/excise +++ b/testdata/excise @@ -540,3 +540,115 @@ c: (something, .) c: (something, .) . . + + +# Test to verify that IngestAndExcise now uses flushableIngest when directed to +# do so using a flag that is passed down by the caller. + +reset +---- + +batch +set a foo +set b bar +---- + +batch +set d@6 baz +---- + +flush +---- + +compact a z +---- + +batch +set d@6 something +set g something +---- + +flush +---- + +lsm +---- +0.0: + 000007:[d@6#13,SET-g#14,SET] +6: + 000005:[a#10,SET-d@6#12,SET] + +batch +set x something +---- + +file-only-snapshot s1 +a z +---- +ok + +lsm +---- +0.0: + 000007:[d@6#13,SET-g#14,SET] +6: + 000005:[a#10,SET-d@6#12,SET] + +build ext7 +del d@6 +---- + +ingest ext7 +---- + +lsm +---- +0.1: + 000008:[d@6#16,DEL-d@6#16,DEL] +0.0: + 000007:[d@6#13,SET-g#14,SET] +6: + 000005:[a#10,SET-d@6#12,SET] + +compact c e +---- + +lsm +---- +6: + 000009:[a#0,SET-g#0,SET] + +build ext5 +set c something +set b something +set f something +del b-e +---- + +ingest-and-excise ext5 excise=b-e flushable-ingest +---- +flushable ingest + +lsm +---- +0.0: + 000013:[x#15,SET-x#15,SET] +6: + 000014(000009):[a#0,SET-a#0,SET] + 000010:[b#17,SET-f#17,SET] + 000015(000009):[g#0,SET-g#0,SET] + +iter lower=c upper=e +last +prev +prev +seek-lt dd +prev +prev +---- +c: (something, .) +. +. +c: (something, .) +. +.