From ef18be527bde66062aef938be75ee36544a40129 Mon Sep 17 00:00:00 2001 From: Aaditya Sondhi <20070511+aadityasondhi@users.noreply.github.com> Date: Wed, 13 Mar 2024 10:24:36 -0400 Subject: [PATCH] ingest,compaction: use excise when flushing flushableIngest This patch amends the ingest pipeline to allow for certain cases to use `flushaleIngest` in order to avoid waiting on memtable flushes when there is overlap. The current use for this is range snapshots in Cockroach where we include `RANGEDEL`. When we use this path, we also pass the `exciseSpan` down down into the flushable to then excise at flush time to still benefit from the use of excise in the ingest path. Fixes #3335. --- compaction.go | 81 ++++++++++++++++++++++++++++++++-------------- data_test.go | 2 +- flushable.go | 5 +++ flushable_test.go | 5 ++- ingest.go | 30 ++++++++++------- ingest_test.go | 8 ++--- metamorphic/ops.go | 6 ++-- open.go | 4 +-- 8 files changed, 92 insertions(+), 49 deletions(-) diff --git a/compaction.go b/compaction.go index 03ab5dec7b..54ae3b79ce 100644 --- a/compaction.go +++ b/compaction.go @@ -1447,17 +1447,66 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) { var err error var fileToSplit *fileMetadata var ingestSplitFiles []ingestSplitFile - for _, file := range c.flushing[0].flushable.(*ingestedFlushable).files { - suggestSplit := d.opts.Experimental.IngestSplit != nil && d.opts.Experimental.IngestSplit() && - d.FormatMajorVersion() >= FormatVirtualSSTables - level, fileToSplit, err = ingestTargetLevel( - d.newIters, d.tableNewRangeKeyIter, iterOpts, d.opts.Comparer, - c.version, baseLevel, d.mu.compact.inProgress, file.FileMetadata, - suggestSplit, - ) + ingestFlushable := c.flushing[0].flushable.(*ingestedFlushable) + + updateLevelMetricsOnExcise := func(m *fileMetadata, level int, added []newFileEntry) { + levelMetrics := c.metrics[level] + if levelMetrics == nil { + levelMetrics = &LevelMetrics{} + c.metrics[level] = levelMetrics + } + levelMetrics.NumFiles-- + levelMetrics.Size -= int64(m.Size) + for i := range added { + levelMetrics.NumFiles++ + levelMetrics.Size += int64(added[i].Meta.Size) + } + } + + suggestSplit := d.opts.Experimental.IngestSplit != nil && d.opts.Experimental.IngestSplit() && + d.FormatMajorVersion() >= FormatVirtualSSTables + + replacedFiles := make(map[base.FileNum][]newFileEntry) + for _, file := range ingestFlushable.files { + if ingestFlushable.exciseSpan.Valid() && ingestFlushable.exciseSpan.Contains(d.cmp, file.FileMetadata.Smallest) && ingestFlushable.exciseSpan.Contains(d.cmp, file.FileMetadata.Largest) { + level = 6 + } else { + level, fileToSplit, err = ingestTargetLevel( + d.newIters, d.tableNewRangeKeyIter, iterOpts, d.opts.Comparer, + c.version, baseLevel, d.mu.compact.inProgress, file.FileMetadata, + suggestSplit, + ) + } + + if ingestFlushable.exciseSpan.Valid() { + // Iterate through all levels and find files that intersect with exciseSpan. + for level = range c.version.Levels { + overlaps := c.version.Overlaps(level, ingestFlushable.exciseSpan.Start, ingestFlushable.exciseSpan.End, true /* exclusiveEnd */) + iter := overlaps.Iter() + + for m := iter.First(); m != nil; m = iter.Next() { + newFiles, err := d.excise(ingestFlushable.exciseSpan, m, ve, level) + if err != nil { + return nil, err + } + + if _, ok := ve.DeletedFiles[deletedFileEntry{ + Level: level, + FileNum: m.FileNum, + }]; !ok { + // We did not excise this file. + continue + } + replacedFiles[m.FileNum] = newFiles + updateLevelMetricsOnExcise(m, level, newFiles) + } + } + } if err != nil { return nil, err } + + // Add the current flushableIngest file to the version. ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: level, Meta: file.FileMetadata}) if fileToSplit != nil { ingestSplitFiles = append(ingestSplitFiles, ingestSplitFile{ @@ -1475,23 +1524,7 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) { levelMetrics.TablesIngested++ } - updateLevelMetricsOnExcise := func(m *fileMetadata, level int, added []newFileEntry) { - levelMetrics := c.metrics[level] - if levelMetrics == nil { - levelMetrics = &LevelMetrics{} - c.metrics[level] = levelMetrics - } - levelMetrics.NumFiles-- - levelMetrics.Size -= int64(m.Size) - for i := range added { - levelMetrics.NumFiles++ - levelMetrics.Size += int64(added[i].Meta.Size) - } - } - if len(ingestSplitFiles) > 0 { - ve.DeletedFiles = make(map[manifest.DeletedFileEntry]*manifest.FileMetadata) - replacedFiles := make(map[base.FileNum][]newFileEntry) if err := d.ingestSplit(ve, updateLevelMetricsOnExcise, ingestSplitFiles, replacedFiles); err != nil { return nil, err } diff --git a/data_test.go b/data_test.go index 3e5933fe0c..f20b54a8ad 100644 --- a/data_test.go +++ b/data_test.go @@ -1268,7 +1268,7 @@ func runIngestAndExciseCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error { } } - if _, err := d.IngestAndExcise(paths, nil /* shared */, nil /* external */, exciseSpan); err != nil { + if _, err := d.IngestAndExcise(paths, nil, nil, exciseSpan, false); err != nil { return err } return nil diff --git a/flushable.go b/flushable.go index 630191f737..58de177717 100644 --- a/flushable.go +++ b/flushable.go @@ -158,6 +158,9 @@ type ingestedFlushable struct { slice manifest.LevelSlice // hasRangeKeys is set on ingestedFlushable construction. hasRangeKeys bool + // exciseSpan is populated if an excise operation should be performed during + // flush. + exciseSpan KeyRange } func newIngestedFlushable( @@ -165,6 +168,7 @@ func newIngestedFlushable( comparer *Comparer, newIters tableNewIters, newRangeKeyIters keyspanimpl.TableNewSpanIter, + exciseSpan KeyRange, ) *ingestedFlushable { var physicalFiles []physicalMeta var hasRangeKeys bool @@ -183,6 +187,7 @@ func newIngestedFlushable( // slice is immutable and can be set once and used many times. slice: manifest.NewLevelSliceKeySorted(comparer.Compare, files), hasRangeKeys: hasRangeKeys, + exciseSpan: exciseSpan, } return ret diff --git a/flushable_test.go b/flushable_test.go index cceb17745f..d1666d76f1 100644 --- a/flushable_test.go +++ b/flushable_test.go @@ -119,9 +119,8 @@ func TestIngestedSSTFlushableAPI(t *testing.T) { } meta := loadFileMeta(paths) - flushable = newIngestedFlushable( - meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter, - ) + // TODO(aaditya): Add tests + flushable = newIngestedFlushable(meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter, KeyRange{}) return "" case "iter": iter := flushable.newIter(nil) diff --git a/ingest.go b/ingest.go index f4fe59f166..e33b067c51 100644 --- a/ingest.go +++ b/ingest.go @@ -1101,7 +1101,7 @@ func (d *DB) Ingest(paths []string) error { if d.opts.ReadOnly { return ErrReadOnly } - _, err := d.ingest(paths, ingestTargetLevel, nil /* shared */, KeyRange{}, nil /* external */) + _, err := d.ingest(paths, ingestTargetLevel, nil, KeyRange{}, false, nil) return err } @@ -1188,7 +1188,7 @@ func (d *DB) IngestWithStats(paths []string) (IngestOperationStats, error) { if d.opts.ReadOnly { return IngestOperationStats{}, ErrReadOnly } - return d.ingest(paths, ingestTargetLevel, nil /* shared */, KeyRange{}, nil /* external */) + return d.ingest(paths, ingestTargetLevel, nil, KeyRange{}, false, nil) } // IngestExternalFiles does the same as IngestWithStats, and additionally @@ -1206,7 +1206,7 @@ func (d *DB) IngestExternalFiles(external []ExternalFile) (IngestOperationStats, if d.opts.Experimental.RemoteStorage == nil { return IngestOperationStats{}, errors.New("pebble: cannot ingest external files without shared storage configured") } - return d.ingest(nil, ingestTargetLevel, nil /* shared */, KeyRange{}, external) + return d.ingest(nil, ingestTargetLevel, nil, KeyRange{}, false, external) } // IngestAndExcise does the same as IngestWithStats, and additionally accepts a @@ -1220,7 +1220,11 @@ func (d *DB) IngestExternalFiles(external []ExternalFile) (IngestOperationStats, // Panics if this DB instance was not instantiated with a remote.Storage and // shared sstables are present. func (d *DB) IngestAndExcise( - paths []string, shared []SharedSSTMeta, external []ExternalFile, exciseSpan KeyRange, + paths []string, + shared []SharedSSTMeta, + external []ExternalFile, + exciseSpan KeyRange, + doFlushableIngest bool, ) (IngestOperationStats, error) { if err := d.closed.Load(); err != nil { panic(err) @@ -1243,12 +1247,12 @@ func (d *DB) IngestAndExcise( v, FormatMinForSharedObjects, ) } - return d.ingest(paths, ingestTargetLevel, shared, exciseSpan, external) + return d.ingest(paths, ingestTargetLevel, shared, exciseSpan, doFlushableIngest, external) } // Both DB.mu and commitPipeline.mu must be held while this is called. func (d *DB) newIngestedFlushableEntry( - meta []*fileMetadata, seqNum uint64, logNum base.DiskFileNum, + meta []*fileMetadata, seqNum uint64, logNum base.DiskFileNum, exciseSpan KeyRange, ) (*flushableEntry, error) { // Update the sequence number for all of the sstables in the // metadata. Writing the metadata to the manifest when the @@ -1264,7 +1268,7 @@ func (d *DB) newIngestedFlushableEntry( } } - f := newIngestedFlushable(meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter) + f := newIngestedFlushable(meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter, exciseSpan) // NB: The logNum/seqNum are the WAL number which we're writing this entry // to and the sequence number within the WAL which we'll write this entry @@ -1294,7 +1298,9 @@ func (d *DB) newIngestedFlushableEntry( // we're holding both locks, the order in which we rotate the memtable or // recycle the WAL in this function is irrelevant as long as the correct log // numbers are assigned to the appropriate flushable. -func (d *DB) handleIngestAsFlushable(meta []*fileMetadata, seqNum uint64) error { +func (d *DB) handleIngestAsFlushable( + meta []*fileMetadata, seqNum uint64, exciseSpan KeyRange, +) error { b := d.NewBatch() for _, m := range meta { b.ingestSST(m.FileNum) @@ -1320,7 +1326,7 @@ func (d *DB) handleIngestAsFlushable(meta []*fileMetadata, seqNum uint64) error d.mu.Lock() } - entry, err := d.newIngestedFlushableEntry(meta, seqNum, logNum) + entry, err := d.newIngestedFlushableEntry(meta, seqNum, logNum, exciseSpan) if err != nil { return err } @@ -1359,6 +1365,7 @@ func (d *DB) ingest( targetLevelFunc ingestTargetLevelFunc, shared []SharedSSTMeta, exciseSpan KeyRange, + doFlushableIngest bool, external []ExternalFile, ) (IngestOperationStats, error) { if len(shared) > 0 && d.opts.Experimental.RemoteStorage == nil { @@ -1555,7 +1562,7 @@ func (d *DB) ingest( } // The ingestion overlaps with some entry in the flushable queue. if d.FormatMajorVersion() < FormatFlushableIngest || - d.opts.Experimental.DisableIngestAsFlushable() || + d.opts.Experimental.DisableIngestAsFlushable() || !doFlushableIngest || len(shared) > 0 || exciseSpan.Valid() || len(external) > 0 || (len(d.mu.mem.queue) > d.opts.MemTableStopWritesThreshold-1) { // We're not able to ingest as a flushable, @@ -1589,7 +1596,7 @@ func (d *DB) ingest( for i := range fileMetas { fileMetas[i] = loadResult.local[i].fileMetadata } - err = d.handleIngestAsFlushable(fileMetas, seqNum) + err = d.handleIngestAsFlushable(fileMetas, seqNum, exciseSpan) } var ve *versionEdit @@ -2420,6 +2427,7 @@ func (d *DB) ingestApply( } } + // TODO(aaditya): should this metric also be incremented for flushableIngests? d.mu.versions.metrics.Ingest.Count++ d.updateReadStateLocked(d.opts.DebugCheck) diff --git a/ingest_test.go b/ingest_test.go index acc29b7f06..b3e7a2e6f1 100644 --- a/ingest_test.go +++ b/ingest_test.go @@ -1090,7 +1090,7 @@ func testIngestSharedImpl( require.NoError(t, err) require.NoError(t, w.Close()) - _, err = to.IngestAndExcise([]string{sstPath}, sharedSSTs, nil /* external */, KeyRange{Start: startKey, End: endKey}) + _, err = to.IngestAndExcise([]string{sstPath}, sharedSSTs, nil, KeyRange{Start: startKey, End: endKey}, false) require.NoError(t, err) return fmt.Sprintf("replicated %d shared SSTs", len(sharedSSTs)) @@ -1318,7 +1318,7 @@ func TestSimpleIngestShared(t *testing.T) { Level: 6, Size: uint64(size + 5), } - _, err = d.IngestAndExcise([]string{}, []SharedSSTMeta{sharedSSTMeta}, nil /* external */, KeyRange{Start: []byte("d"), End: []byte("ee")}) + _, err = d.IngestAndExcise([]string{}, []SharedSSTMeta{sharedSSTMeta}, nil, KeyRange{Start: []byte("d"), End: []byte("ee")}, false) require.NoError(t, err) // TODO(bilal): Once reading of shared sstables is in, verify that the values @@ -1579,7 +1579,7 @@ func TestConcurrentExcise(t *testing.T) { require.NoError(t, err) require.NoError(t, w.Close()) - _, err = to.IngestAndExcise([]string{sstPath}, sharedSSTs, nil /* external */, KeyRange{Start: startKey, End: endKey}) + _, err = to.IngestAndExcise([]string{sstPath}, sharedSSTs, nil, KeyRange{Start: startKey, End: endKey}, false) require.NoError(t, err) return fmt.Sprintf("replicated %d shared SSTs", len(sharedSSTs)) @@ -1980,7 +1980,7 @@ func TestIngestExternal(t *testing.T) { ) require.NoError(t, err) require.NoError(t, w.Close()) - _, err = to.IngestAndExcise([]string{sstPath}, nil /* sharedSSTs */, externalFiles, KeyRange{Start: startKey, End: endKey}) + _, err = to.IngestAndExcise([]string{sstPath}, nil, externalFiles, KeyRange{Start: startKey, End: endKey}, false) require.NoError(t, err) return fmt.Sprintf("replicated %d external SSTs", len(externalFiles)) diff --git a/metamorphic/ops.go b/metamorphic/ops.go index 49ca80feb4..0922c5f823 100644 --- a/metamorphic/ops.go +++ b/metamorphic/ops.go @@ -875,10 +875,10 @@ func (o *ingestAndExciseOp) run(t *Test, h historyRecorder) { if t.testOpts.useExcise { err = firstError(err, t.withRetries(func() error { - _, err := t.getDB(o.dbID).IngestAndExcise([]string{path}, nil /* sharedSSTs */, nil /* external */, pebble.KeyRange{ + _, err := t.getDB(o.dbID).IngestAndExcise([]string{path}, nil, nil, pebble.KeyRange{ Start: o.exciseStart, End: o.exciseEnd, - }) + }, false) return err })) } else { @@ -1945,7 +1945,7 @@ func (r *replicateOp) runSharedReplicate( return } - _, err = dest.IngestAndExcise([]string{sstPath}, sharedSSTs, nil /* external */, pebble.KeyRange{Start: r.start, End: r.end}) + _, err = dest.IngestAndExcise([]string{sstPath}, sharedSSTs, nil, pebble.KeyRange{Start: r.start, End: r.end}, false) h.Recordf("%s // %v", r, err) } diff --git a/open.go b/open.go index 0217be606f..7e8b172d93 100644 --- a/open.go +++ b/open.go @@ -933,9 +933,7 @@ func (d *DB) replayWAL( panic("pebble: couldn't load all files in WAL entry.") } - entry, err = d.newIngestedFlushableEntry( - meta, seqNum, base.DiskFileNum(ll.Num), - ) + entry, err = d.newIngestedFlushableEntry(meta, seqNum, base.DiskFileNum(ll.Num), KeyRange{}) if err != nil { return nil, 0, err }