From ba3ea2a161f10a59830f76718bfeeb30709c1d44 Mon Sep 17 00:00:00 2001 From: Aaditya Sondhi <20070511+aadityasondhi@users.noreply.github.com> Date: Wed, 13 Mar 2024 10:24:36 -0400 Subject: [PATCH] ingest,compaction: use excise when flushing flushableIngest This patch amends the ingest pipeline to allow for certain cases to use `flushaleIngest` in order to avoid waiting on memtable flushes when there is overlap. The current use for this is range snapshots in Cockroach where we include `RANGEDEL`. When we use this path, we also pass the `exciseSpan` down down into the flushable to then excise at flush time to still benefit from the use of excise in the ingest path. Fixes #3335. --- compaction.go | 83 +++++++++++++++++++++++---------- data_test.go | 5 +- flushable.go | 5 ++ flushable_test.go | 4 +- ingest.go | 46 +++++++++++++------ ingest_test.go | 15 ++++-- metamorphic/ops.go | 6 +-- open.go | 4 +- testdata/excise | 112 +++++++++++++++++++++++++++++++++++++++++++++ 9 files changed, 227 insertions(+), 53 deletions(-) diff --git a/compaction.go b/compaction.go index dafe36d088e..cb8938f6d9e 100644 --- a/compaction.go +++ b/compaction.go @@ -1448,17 +1448,68 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) { var err error var fileToSplit *fileMetadata var ingestSplitFiles []ingestSplitFile - for _, file := range c.flushing[0].flushable.(*ingestedFlushable).files { - suggestSplit := d.opts.Experimental.IngestSplit != nil && d.opts.Experimental.IngestSplit() && - d.FormatMajorVersion() >= FormatVirtualSSTables - level, fileToSplit, err = ingestTargetLevel( - d.newIters, d.tableNewRangeKeyIter, iterOpts, d.opts.Comparer, - c.version, baseLevel, d.mu.compact.inProgress, file.FileMetadata, - suggestSplit, - ) + ingestFlushable := c.flushing[0].flushable.(*ingestedFlushable) + + updateLevelMetricsOnExcise := func(m *fileMetadata, level int, added []newFileEntry) { + levelMetrics := c.metrics[level] + if levelMetrics == nil { + levelMetrics = &LevelMetrics{} + c.metrics[level] = levelMetrics + } + levelMetrics.NumFiles-- + levelMetrics.Size -= int64(m.Size) + for i := range added { + levelMetrics.NumFiles++ + levelMetrics.Size += int64(added[i].Meta.Size) + } + } + + suggestSplit := d.opts.Experimental.IngestSplit != nil && d.opts.Experimental.IngestSplit() && + d.FormatMajorVersion() >= FormatVirtualSSTables + + replacedFiles := make(map[base.FileNum][]newFileEntry) + for _, file := range ingestFlushable.files { + // This file fits perfectly within the excise span, so we can slot it at L6. + if ingestFlushable.exciseSpan.Valid() && ingestFlushable.exciseSpan.Contains(d.cmp, file.FileMetadata.Smallest) && ingestFlushable.exciseSpan.Contains(d.cmp, file.FileMetadata.Largest) { + level = 6 + } else { + level, fileToSplit, err = ingestTargetLevel( + d.newIters, d.tableNewRangeKeyIter, iterOpts, d.opts.Comparer, + c.version, baseLevel, d.mu.compact.inProgress, file.FileMetadata, + suggestSplit, + ) + } + + if ingestFlushable.exciseSpan.Valid() { + ve.DeletedFiles = map[manifest.DeletedFileEntry]*manifest.FileMetadata{} + // Iterate through all levels and find files that intersect with exciseSpan. + for level = range c.version.Levels { + overlaps := c.version.Overlaps(level, base.UserKeyBoundsEndExclusive(ingestFlushable.exciseSpan.Start, ingestFlushable.exciseSpan.End)) + iter := overlaps.Iter() + + for m := iter.First(); m != nil; m = iter.Next() { + newFiles, err := d.excise(ingestFlushable.exciseSpan, m, ve, level) + if err != nil { + return nil, err + } + + if _, ok := ve.DeletedFiles[deletedFileEntry{ + Level: level, + FileNum: m.FileNum, + }]; !ok { + // We did not excise this file. + continue + } + replacedFiles[m.FileNum] = newFiles + updateLevelMetricsOnExcise(m, level, newFiles) + } + } + } if err != nil { return nil, err } + + // Add the current flushableIngest file to the version. ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: level, Meta: file.FileMetadata}) if fileToSplit != nil { ingestSplitFiles = append(ingestSplitFiles, ingestSplitFile{ @@ -1476,23 +1527,7 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) { levelMetrics.TablesIngested++ } - updateLevelMetricsOnExcise := func(m *fileMetadata, level int, added []newFileEntry) { - levelMetrics := c.metrics[level] - if levelMetrics == nil { - levelMetrics = &LevelMetrics{} - c.metrics[level] = levelMetrics - } - levelMetrics.NumFiles-- - levelMetrics.Size -= int64(m.Size) - for i := range added { - levelMetrics.NumFiles++ - levelMetrics.Size += int64(added[i].Meta.Size) - } - } - if len(ingestSplitFiles) > 0 { - ve.DeletedFiles = make(map[manifest.DeletedFileEntry]*manifest.FileMetadata) - replacedFiles := make(map[base.FileNum][]newFileEntry) if err := d.ingestSplit(ve, updateLevelMetricsOnExcise, ingestSplitFiles, replacedFiles); err != nil { return nil, err } diff --git a/data_test.go b/data_test.go index aa639f2e866..62027ef4870 100644 --- a/data_test.go +++ b/data_test.go @@ -1258,6 +1258,7 @@ func (d *DB) waitTableStats() { func runIngestAndExciseCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error { var exciseSpan KeyRange + var doFlushableIngest bool paths := make([]string, 0, len(td.CmdArgs)) for i, arg := range td.CmdArgs { switch td.CmdArgs[i].Key { @@ -1271,12 +1272,14 @@ func runIngestAndExciseCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error { } exciseSpan.Start = []byte(fields[0]) exciseSpan.End = []byte(fields[1]) + case "flushable-ingest": + doFlushableIngest = true default: paths = append(paths, arg.String()) } } - if _, err := d.IngestAndExcise(paths, nil /* shared */, nil /* external */, exciseSpan); err != nil { + if _, err := d.IngestAndExcise(paths, nil, nil, exciseSpan, doFlushableIngest); err != nil { return err } return nil diff --git a/flushable.go b/flushable.go index 08364e6f811..f243564d197 100644 --- a/flushable.go +++ b/flushable.go @@ -162,6 +162,9 @@ type ingestedFlushable struct { slice manifest.LevelSlice // hasRangeKeys is set on ingestedFlushable construction. hasRangeKeys bool + // exciseSpan is populated if an excise operation should be performed during + // flush. + exciseSpan KeyRange } func newIngestedFlushable( @@ -169,6 +172,7 @@ func newIngestedFlushable( comparer *Comparer, newIters tableNewIters, newRangeKeyIters keyspanimpl.TableNewSpanIter, + exciseSpan KeyRange, ) *ingestedFlushable { if invariants.Enabled { for i := 1; i < len(files); i++ { @@ -196,6 +200,7 @@ func newIngestedFlushable( // slice is immutable and can be set once and used many times. slice: manifest.NewLevelSliceKeySorted(comparer.Compare, files), hasRangeKeys: hasRangeKeys, + exciseSpan: exciseSpan, } return ret diff --git a/flushable_test.go b/flushable_test.go index 91ed10f1354..06b3d8f48a2 100644 --- a/flushable_test.go +++ b/flushable_test.go @@ -119,9 +119,7 @@ func TestIngestedSSTFlushableAPI(t *testing.T) { } meta := loadFileMeta(paths) - flushable = newIngestedFlushable( - meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter, - ) + flushable = newIngestedFlushable(meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter, KeyRange{}) return "" case "iter": iter := flushable.newIter(nil) diff --git a/ingest.go b/ingest.go index 77d23ffc6bd..ac7325dd5d5 100644 --- a/ingest.go +++ b/ingest.go @@ -1162,7 +1162,7 @@ func (d *DB) Ingest(paths []string) error { if d.opts.ReadOnly { return ErrReadOnly } - _, err := d.ingest(paths, ingestTargetLevel, nil /* shared */, KeyRange{}, nil /* external */) + _, err := d.ingest(paths, ingestTargetLevel, nil, KeyRange{}, false, nil) return err } @@ -1250,7 +1250,7 @@ func (d *DB) IngestWithStats(paths []string) (IngestOperationStats, error) { if d.opts.ReadOnly { return IngestOperationStats{}, ErrReadOnly } - return d.ingest(paths, ingestTargetLevel, nil /* shared */, KeyRange{}, nil /* external */) + return d.ingest(paths, ingestTargetLevel, nil, KeyRange{}, false, nil) } // IngestExternalFiles does the same as IngestWithStats, and additionally @@ -1268,7 +1268,7 @@ func (d *DB) IngestExternalFiles(external []ExternalFile) (IngestOperationStats, if d.opts.Experimental.RemoteStorage == nil { return IngestOperationStats{}, errors.New("pebble: cannot ingest external files without shared storage configured") } - return d.ingest(nil, ingestTargetLevel, nil /* shared */, KeyRange{}, external) + return d.ingest(nil, ingestTargetLevel, nil, KeyRange{}, false, external) } // IngestAndExcise does the same as IngestWithStats, and additionally accepts a @@ -1282,7 +1282,11 @@ func (d *DB) IngestExternalFiles(external []ExternalFile) (IngestOperationStats, // Panics if this DB instance was not instantiated with a remote.Storage and // shared sstables are present. func (d *DB) IngestAndExcise( - paths []string, shared []SharedSSTMeta, external []ExternalFile, exciseSpan KeyRange, + paths []string, + shared []SharedSSTMeta, + external []ExternalFile, + exciseSpan KeyRange, + sstsContainExciseTombstone bool, ) (IngestOperationStats, error) { if err := d.closed.Load(); err != nil { panic(err) @@ -1305,12 +1309,12 @@ func (d *DB) IngestAndExcise( v, FormatMinForSharedObjects, ) } - return d.ingest(paths, ingestTargetLevel, shared, exciseSpan, external) + return d.ingest(paths, ingestTargetLevel, shared, exciseSpan, sstsContainExciseTombstone, external) } // Both DB.mu and commitPipeline.mu must be held while this is called. func (d *DB) newIngestedFlushableEntry( - meta []*fileMetadata, seqNum uint64, logNum base.DiskFileNum, + meta []*fileMetadata, seqNum uint64, logNum base.DiskFileNum, exciseSpan KeyRange, ) (*flushableEntry, error) { // Update the sequence number for all of the sstables in the // metadata. Writing the metadata to the manifest when the @@ -1326,7 +1330,7 @@ func (d *DB) newIngestedFlushableEntry( } } - f := newIngestedFlushable(meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter) + f := newIngestedFlushable(meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter, exciseSpan) // NB: The logNum/seqNum are the WAL number which we're writing this entry // to and the sequence number within the WAL which we'll write this entry @@ -1356,7 +1360,9 @@ func (d *DB) newIngestedFlushableEntry( // we're holding both locks, the order in which we rotate the memtable or // recycle the WAL in this function is irrelevant as long as the correct log // numbers are assigned to the appropriate flushable. -func (d *DB) handleIngestAsFlushable(meta []*fileMetadata, seqNum uint64) error { +func (d *DB) handleIngestAsFlushable( + meta []*fileMetadata, seqNum uint64, exciseSpan KeyRange, +) error { b := d.NewBatch() for _, m := range meta { b.ingestSST(m.FileNum) @@ -1382,7 +1388,7 @@ func (d *DB) handleIngestAsFlushable(meta []*fileMetadata, seqNum uint64) error d.mu.Lock() } - entry, err := d.newIngestedFlushableEntry(meta, seqNum, logNum) + entry, err := d.newIngestedFlushableEntry(meta, seqNum, logNum, exciseSpan) if err != nil { return err } @@ -1411,6 +1417,7 @@ func (d *DB) handleIngestAsFlushable(meta []*fileMetadata, seqNum uint64) error d.mu.mem.queue = append(d.mu.mem.queue, entry) d.rotateMemtable(newLogNum, nextSeqNum, currMem) d.updateReadStateLocked(d.opts.DebugCheck) + // TODO(aaditya): is this necessary? we call this already in rotateMemtable above d.maybeScheduleFlush() return nil } @@ -1421,6 +1428,7 @@ func (d *DB) ingest( targetLevelFunc ingestTargetLevelFunc, shared []SharedSSTMeta, exciseSpan KeyRange, + sstsContainExciseTombstone bool, external []ExternalFile, ) (IngestOperationStats, error) { if len(shared) > 0 && d.opts.Experimental.RemoteStorage == nil { @@ -1621,11 +1629,19 @@ func (d *DB) ingest( mut.writerRef() return } - // The ingestion overlaps with some entry in the flushable queue. - if d.FormatMajorVersion() < FormatFlushableIngest || - d.opts.Experimental.DisableIngestAsFlushable() || - len(shared) > 0 || exciseSpan.Valid() || len(external) > 0 || - (len(d.mu.mem.queue) > d.opts.MemTableStopWritesThreshold-1) { + + // The ingestion overlaps with some entry in the flushable queue. If the + // pre-conditions are met below, we can treat this ingestion as a flushable + // ingest, otherwise we wait on the memtable flush before ingestion. + // + // TODO(aaditya): We should make flushableIngest compatible with remote + // files. + hasRemoteFiles := len(shared) > 0 || len(external) > 0 + canIngestFlushable := d.FormatMajorVersion() >= FormatFlushableIngest && + (len(d.mu.mem.queue) < d.opts.MemTableStopWritesThreshold) && + !d.opts.Experimental.DisableIngestAsFlushable() && !hasRemoteFiles + + if !canIngestFlushable || (exciseSpan.Valid() && !sstsContainExciseTombstone) { // We're not able to ingest as a flushable, // so we must synchronously flush. // @@ -1657,7 +1673,7 @@ func (d *DB) ingest( for i := range fileMetas { fileMetas[i] = loadResult.local[i].fileMetadata } - err = d.handleIngestAsFlushable(fileMetas, seqNum) + err = d.handleIngestAsFlushable(fileMetas, seqNum, exciseSpan) } var ve *versionEdit diff --git a/ingest_test.go b/ingest_test.go index 4aa654d8af3..4d393404aaf 100644 --- a/ingest_test.go +++ b/ingest_test.go @@ -696,6 +696,9 @@ func TestExcise(t *testing.T) { case "ingest-and-excise": flushed = false + d.mu.Lock() + prevFlushableIngests := d.mu.versions.metrics.Flush.AsIngestCount + d.mu.Unlock() if err := runIngestAndExciseCmd(td, d, mem); err != nil { return err.Error() } @@ -704,8 +707,12 @@ func TestExcise(t *testing.T) { for d.mu.compact.flushing { d.mu.compact.cond.Wait() } + flushableIngests := d.mu.versions.metrics.Flush.AsIngestCount d.mu.Unlock() if flushed { + if prevFlushableIngests < flushableIngests { + return "flushable ingest" + } return "memtable flushed" } return "" @@ -1091,7 +1098,7 @@ func testIngestSharedImpl( require.NoError(t, err) require.NoError(t, w.Close()) - _, err = to.IngestAndExcise([]string{sstPath}, sharedSSTs, nil /* external */, KeyRange{Start: startKey, End: endKey}) + _, err = to.IngestAndExcise([]string{sstPath}, sharedSSTs, nil /* external */, KeyRange{Start: startKey, End: endKey}, false) require.NoError(t, err) return fmt.Sprintf("replicated %d shared SSTs", len(sharedSSTs)) @@ -1320,7 +1327,7 @@ func TestSimpleIngestShared(t *testing.T) { Level: 6, Size: uint64(size + 5), } - _, err = d.IngestAndExcise([]string{}, []SharedSSTMeta{sharedSSTMeta}, nil /* external */, KeyRange{Start: []byte("d"), End: []byte("ee")}) + _, err = d.IngestAndExcise([]string{}, []SharedSSTMeta{sharedSSTMeta}, nil /* external */, KeyRange{Start: []byte("d"), End: []byte("ee")}, false) require.NoError(t, err) // TODO(bilal): Once reading of shared sstables is in, verify that the values @@ -1582,7 +1589,7 @@ func TestConcurrentExcise(t *testing.T) { require.NoError(t, err) require.NoError(t, w.Close()) - _, err = to.IngestAndExcise([]string{sstPath}, sharedSSTs, nil /* external */, KeyRange{Start: startKey, End: endKey}) + _, err = to.IngestAndExcise([]string{sstPath}, sharedSSTs, nil, KeyRange{Start: startKey, End: endKey}, false) require.NoError(t, err) return fmt.Sprintf("replicated %d shared SSTs", len(sharedSSTs)) @@ -1991,7 +1998,7 @@ func TestIngestExternal(t *testing.T) { ) require.NoError(t, err) require.NoError(t, w.Close()) - _, err = to.IngestAndExcise([]string{sstPath}, nil /* sharedSSTs */, externalFiles, KeyRange{Start: startKey, End: endKey}) + _, err = to.IngestAndExcise([]string{sstPath}, nil /* shared */, externalFiles, KeyRange{Start: startKey, End: endKey}, false) require.NoError(t, err) return fmt.Sprintf("replicated %d external SSTs", len(externalFiles)) diff --git a/metamorphic/ops.go b/metamorphic/ops.go index 4339bd91d81..daddfee5d9b 100644 --- a/metamorphic/ops.go +++ b/metamorphic/ops.go @@ -923,10 +923,10 @@ func (o *ingestAndExciseOp) run(t *Test, h historyRecorder) { if t.testOpts.useExcise { err = firstError(err, t.withRetries(func() error { - _, err := t.getDB(o.dbID).IngestAndExcise([]string{path}, nil /* sharedSSTs */, nil /* external */, pebble.KeyRange{ + _, err := t.getDB(o.dbID).IngestAndExcise([]string{path}, nil /* shared */, nil /* external */, pebble.KeyRange{ Start: o.exciseStart, End: o.exciseEnd, - }) + }, false) return err })) } else { @@ -1988,7 +1988,7 @@ func (r *replicateOp) runSharedReplicate( return } - _, err = dest.IngestAndExcise([]string{sstPath}, sharedSSTs, nil /* external */, pebble.KeyRange{Start: r.start, End: r.end}) + _, err = dest.IngestAndExcise([]string{sstPath}, sharedSSTs, nil /* external */, pebble.KeyRange{Start: r.start, End: r.end}, false) h.Recordf("%s // %v", r, err) } diff --git a/open.go b/open.go index b4bd52aef57..52c9cc461d7 100644 --- a/open.go +++ b/open.go @@ -936,9 +936,7 @@ func (d *DB) replayWAL( panic("pebble: couldn't load all files in WAL entry.") } - entry, err = d.newIngestedFlushableEntry( - meta, seqNum, base.DiskFileNum(ll.Num), - ) + entry, err = d.newIngestedFlushableEntry(meta, seqNum, base.DiskFileNum(ll.Num), KeyRange{}) if err != nil { return nil, 0, err } diff --git a/testdata/excise b/testdata/excise index 391af7afaeb..05a2bf4c5d7 100644 --- a/testdata/excise +++ b/testdata/excise @@ -540,3 +540,115 @@ c: (something, .) c: (something, .) . . + + +# Test to verify that IngestAndExcise now uses flushableIngest when directed to +# do so using a flag that is passed down by the caller. + +reset +---- + +batch +set a foo +set b bar +---- + +batch +set d@6 baz +---- + +flush +---- + +compact a z +---- + +batch +set d@6 something +set g something +---- + +flush +---- + +lsm +---- +0.0: + 000007:[d@6#13,SET-g#14,SET] +6: + 000005:[a#10,SET-d@6#12,SET] + +batch +set x something +---- + +file-only-snapshot s1 +a z +---- +ok + +lsm +---- +0.0: + 000007:[d@6#13,SET-g#14,SET] +6: + 000005:[a#10,SET-d@6#12,SET] + +build ext7 +del d@6 +---- + +ingest ext7 +---- + +lsm +---- +0.1: + 000008:[d@6#16,DEL-d@6#16,DEL] +0.0: + 000007:[d@6#13,SET-g#14,SET] +6: + 000005:[a#10,SET-d@6#12,SET] + +compact c e +---- + +lsm +---- +6: + 000009:[a#0,SET-g#0,SET] + +build ext5 +set c something +set b something +set f something +del b-e +---- + +ingest-and-excise ext5 excise=b-e flushable-ingest +---- +flushable ingest + +lsm +---- +0.0: + 000013:[x#15,SET-x#15,SET] +6: + 000014(000009):[a#0,SET-a#0,SET] + 000010:[b#17,SET-f#17,SET] + 000015(000009):[g#0,SET-g#0,SET] + +iter lower=c upper=e +last +prev +prev +seek-lt dd +prev +prev +---- +c: (something, .) +. +. +c: (something, .) +. +.