Skip to content

Commit

Permalink
ingest,compaction: use excise when flushing flushableIngest
Browse files Browse the repository at this point in the history
This patch amends the ingest pipeline to allow for certain cases to use
`flushaleIngest` in order to avoid waiting on memtable flushes when
there is overlap. The current use for this is range snapshots in
Cockroach where we include `RANGEDEL`. When we use this path, we also
pass the `exciseSpan` down down into the flushable to then excise at
flush time to still benefit from the use of excise in the ingest path.

Fixes cockroachdb#3335.
  • Loading branch information
aadityasondhi committed Mar 20, 2024
1 parent 702f8cc commit fbfa1d2
Show file tree
Hide file tree
Showing 9 changed files with 229 additions and 53 deletions.
85 changes: 61 additions & 24 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -1448,17 +1448,70 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
var err error
var fileToSplit *fileMetadata
var ingestSplitFiles []ingestSplitFile
for _, file := range c.flushing[0].flushable.(*ingestedFlushable).files {
suggestSplit := d.opts.Experimental.IngestSplit != nil && d.opts.Experimental.IngestSplit() &&
d.FormatMajorVersion() >= FormatVirtualSSTables
level, fileToSplit, err = ingestTargetLevel(
d.newIters, d.tableNewRangeKeyIter, iterOpts, d.opts.Comparer,
c.version, baseLevel, d.mu.compact.inProgress, file.FileMetadata,
suggestSplit,
)
ingestFlushable := c.flushing[0].flushable.(*ingestedFlushable)

updateLevelMetricsOnExcise := func(m *fileMetadata, level int, added []newFileEntry) {
levelMetrics := c.metrics[level]
if levelMetrics == nil {
levelMetrics = &LevelMetrics{}
c.metrics[level] = levelMetrics
}
levelMetrics.NumFiles--
levelMetrics.Size -= int64(m.Size)
for i := range added {
levelMetrics.NumFiles++
levelMetrics.Size += int64(added[i].Meta.Size)
}
}

suggestSplit := d.opts.Experimental.IngestSplit != nil && d.opts.Experimental.IngestSplit() &&
d.FormatMajorVersion() >= FormatVirtualSSTables

replacedFiles := make(map[base.FileNum][]newFileEntry)
for _, file := range ingestFlushable.files {
// This file fits perfectly within the excise span, so we can slot it at L6.
if ingestFlushable.exciseSpan.Valid() &&
ingestFlushable.exciseSpan.Contains(d.cmp, file.FileMetadata.Smallest) &&
ingestFlushable.exciseSpan.Contains(d.cmp, file.FileMetadata.Largest) {
level = 6
} else {
level, fileToSplit, err = ingestTargetLevel(
d.newIters, d.tableNewRangeKeyIter, iterOpts, d.opts.Comparer,
c.version, baseLevel, d.mu.compact.inProgress, file.FileMetadata,
suggestSplit,
)
}

if ingestFlushable.exciseSpan.Valid() {
ve.DeletedFiles = map[manifest.DeletedFileEntry]*manifest.FileMetadata{}
// Iterate through all levels and find files that intersect with exciseSpan.
for level = range c.version.Levels {
overlaps := c.version.Overlaps(level, base.UserKeyBoundsEndExclusive(ingestFlushable.exciseSpan.Start, ingestFlushable.exciseSpan.End))
iter := overlaps.Iter()

for m := iter.First(); m != nil; m = iter.Next() {
newFiles, err := d.excise(ingestFlushable.exciseSpan, m, ve, level)
if err != nil {
return nil, err
}

if _, ok := ve.DeletedFiles[deletedFileEntry{
Level: level,
FileNum: m.FileNum,
}]; !ok {
// We did not excise this file.
continue
}
replacedFiles[m.FileNum] = newFiles
updateLevelMetricsOnExcise(m, level, newFiles)
}
}
}
if err != nil {
return nil, err
}

// Add the current flushableIngest file to the version.
ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: level, Meta: file.FileMetadata})
if fileToSplit != nil {
ingestSplitFiles = append(ingestSplitFiles, ingestSplitFile{
Expand All @@ -1476,23 +1529,7 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
levelMetrics.TablesIngested++
}

updateLevelMetricsOnExcise := func(m *fileMetadata, level int, added []newFileEntry) {
levelMetrics := c.metrics[level]
if levelMetrics == nil {
levelMetrics = &LevelMetrics{}
c.metrics[level] = levelMetrics
}
levelMetrics.NumFiles--
levelMetrics.Size -= int64(m.Size)
for i := range added {
levelMetrics.NumFiles++
levelMetrics.Size += int64(added[i].Meta.Size)
}
}

if len(ingestSplitFiles) > 0 {
ve.DeletedFiles = make(map[manifest.DeletedFileEntry]*manifest.FileMetadata)
replacedFiles := make(map[base.FileNum][]newFileEntry)
if err := d.ingestSplit(ve, updateLevelMetricsOnExcise, ingestSplitFiles, replacedFiles); err != nil {
return nil, err
}
Expand Down
5 changes: 4 additions & 1 deletion data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1258,6 +1258,7 @@ func (d *DB) waitTableStats() {

func runIngestAndExciseCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error {
var exciseSpan KeyRange
var sstContainsExciseTombstone bool
paths := make([]string, 0, len(td.CmdArgs))
for i, arg := range td.CmdArgs {
switch td.CmdArgs[i].Key {
Expand All @@ -1271,12 +1272,14 @@ func runIngestAndExciseCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error {
}
exciseSpan.Start = []byte(fields[0])
exciseSpan.End = []byte(fields[1])
case "contains-excise-tombstone":
sstContainsExciseTombstone = true
default:
paths = append(paths, arg.String())
}
}

if _, err := d.IngestAndExcise(paths, nil /* shared */, nil /* external */, exciseSpan); err != nil {
if _, err := d.IngestAndExcise(paths, nil /* shared */, nil /* external */, exciseSpan, sstContainsExciseTombstone); err != nil {
return err
}
return nil
Expand Down
5 changes: 5 additions & 0 deletions flushable.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,17 @@ type ingestedFlushable struct {
slice manifest.LevelSlice
// hasRangeKeys is set on ingestedFlushable construction.
hasRangeKeys bool
// exciseSpan is populated if an excise operation should be performed during
// flush.
exciseSpan KeyRange
}

func newIngestedFlushable(
files []*fileMetadata,
comparer *Comparer,
newIters tableNewIters,
newRangeKeyIters keyspanimpl.TableNewSpanIter,
exciseSpan KeyRange,
) *ingestedFlushable {
if invariants.Enabled {
for i := 1; i < len(files); i++ {
Expand Down Expand Up @@ -196,6 +200,7 @@ func newIngestedFlushable(
// slice is immutable and can be set once and used many times.
slice: manifest.NewLevelSliceKeySorted(comparer.Compare, files),
hasRangeKeys: hasRangeKeys,
exciseSpan: exciseSpan,
}

return ret
Expand Down
4 changes: 1 addition & 3 deletions flushable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,7 @@ func TestIngestedSSTFlushableAPI(t *testing.T) {
}

meta := loadFileMeta(paths)
flushable = newIngestedFlushable(
meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter,
)
flushable = newIngestedFlushable(meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter, KeyRange{})
return ""
case "iter":
iter := flushable.newIter(nil)
Expand Down
46 changes: 31 additions & 15 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -1162,7 +1162,7 @@ func (d *DB) Ingest(paths []string) error {
if d.opts.ReadOnly {
return ErrReadOnly
}
_, err := d.ingest(paths, ingestTargetLevel, nil /* shared */, KeyRange{}, nil /* external */)
_, err := d.ingest(paths, ingestTargetLevel, nil /* shared */, KeyRange{}, false, nil /* external */)
return err
}

Expand Down Expand Up @@ -1250,7 +1250,7 @@ func (d *DB) IngestWithStats(paths []string) (IngestOperationStats, error) {
if d.opts.ReadOnly {
return IngestOperationStats{}, ErrReadOnly
}
return d.ingest(paths, ingestTargetLevel, nil /* shared */, KeyRange{}, nil /* external */)
return d.ingest(paths, ingestTargetLevel, nil, KeyRange{}, false, nil)
}

// IngestExternalFiles does the same as IngestWithStats, and additionally
Expand All @@ -1268,7 +1268,7 @@ func (d *DB) IngestExternalFiles(external []ExternalFile) (IngestOperationStats,
if d.opts.Experimental.RemoteStorage == nil {
return IngestOperationStats{}, errors.New("pebble: cannot ingest external files without shared storage configured")
}
return d.ingest(nil, ingestTargetLevel, nil /* shared */, KeyRange{}, external)
return d.ingest(nil, ingestTargetLevel, nil, KeyRange{}, false, external)
}

// IngestAndExcise does the same as IngestWithStats, and additionally accepts a
Expand All @@ -1282,7 +1282,11 @@ func (d *DB) IngestExternalFiles(external []ExternalFile) (IngestOperationStats,
// Panics if this DB instance was not instantiated with a remote.Storage and
// shared sstables are present.
func (d *DB) IngestAndExcise(
paths []string, shared []SharedSSTMeta, external []ExternalFile, exciseSpan KeyRange,
paths []string,
shared []SharedSSTMeta,
external []ExternalFile,
exciseSpan KeyRange,
sstsContainExciseTombstone bool,
) (IngestOperationStats, error) {
if err := d.closed.Load(); err != nil {
panic(err)
Expand All @@ -1305,12 +1309,12 @@ func (d *DB) IngestAndExcise(
v, FormatMinForSharedObjects,
)
}
return d.ingest(paths, ingestTargetLevel, shared, exciseSpan, external)
return d.ingest(paths, ingestTargetLevel, shared, exciseSpan, sstsContainExciseTombstone, external)
}

// Both DB.mu and commitPipeline.mu must be held while this is called.
func (d *DB) newIngestedFlushableEntry(
meta []*fileMetadata, seqNum uint64, logNum base.DiskFileNum,
meta []*fileMetadata, seqNum uint64, logNum base.DiskFileNum, exciseSpan KeyRange,
) (*flushableEntry, error) {
// Update the sequence number for all of the sstables in the
// metadata. Writing the metadata to the manifest when the
Expand All @@ -1326,7 +1330,7 @@ func (d *DB) newIngestedFlushableEntry(
}
}

f := newIngestedFlushable(meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter)
f := newIngestedFlushable(meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter, exciseSpan)

// NB: The logNum/seqNum are the WAL number which we're writing this entry
// to and the sequence number within the WAL which we'll write this entry
Expand Down Expand Up @@ -1356,7 +1360,9 @@ func (d *DB) newIngestedFlushableEntry(
// we're holding both locks, the order in which we rotate the memtable or
// recycle the WAL in this function is irrelevant as long as the correct log
// numbers are assigned to the appropriate flushable.
func (d *DB) handleIngestAsFlushable(meta []*fileMetadata, seqNum uint64) error {
func (d *DB) handleIngestAsFlushable(
meta []*fileMetadata, seqNum uint64, exciseSpan KeyRange,
) error {
b := d.NewBatch()
for _, m := range meta {
b.ingestSST(m.FileNum)
Expand All @@ -1382,7 +1388,7 @@ func (d *DB) handleIngestAsFlushable(meta []*fileMetadata, seqNum uint64) error
d.mu.Lock()
}

entry, err := d.newIngestedFlushableEntry(meta, seqNum, logNum)
entry, err := d.newIngestedFlushableEntry(meta, seqNum, logNum, exciseSpan)
if err != nil {
return err
}
Expand Down Expand Up @@ -1411,6 +1417,7 @@ func (d *DB) handleIngestAsFlushable(meta []*fileMetadata, seqNum uint64) error
d.mu.mem.queue = append(d.mu.mem.queue, entry)
d.rotateMemtable(newLogNum, nextSeqNum, currMem)
d.updateReadStateLocked(d.opts.DebugCheck)
// TODO(aaditya): is this necessary? we call this already in rotateMemtable above
d.maybeScheduleFlush()
return nil
}
Expand All @@ -1421,6 +1428,7 @@ func (d *DB) ingest(
targetLevelFunc ingestTargetLevelFunc,
shared []SharedSSTMeta,
exciseSpan KeyRange,
sstsContainExciseTombstone bool,
external []ExternalFile,
) (IngestOperationStats, error) {
if len(shared) > 0 && d.opts.Experimental.RemoteStorage == nil {
Expand Down Expand Up @@ -1621,11 +1629,19 @@ func (d *DB) ingest(
mut.writerRef()
return
}
// The ingestion overlaps with some entry in the flushable queue.
if d.FormatMajorVersion() < FormatFlushableIngest ||
d.opts.Experimental.DisableIngestAsFlushable() ||
len(shared) > 0 || exciseSpan.Valid() || len(external) > 0 ||
(len(d.mu.mem.queue) > d.opts.MemTableStopWritesThreshold-1) {

// The ingestion overlaps with some entry in the flushable queue. If the
// pre-conditions are met below, we can treat this ingestion as a flushable
// ingest, otherwise we wait on the memtable flush before ingestion.
//
// TODO(aaditya): We should make flushableIngest compatible with remote
// files.
hasRemoteFiles := len(shared) > 0 || len(external) > 0
canIngestFlushable := d.FormatMajorVersion() >= FormatFlushableIngest &&
(len(d.mu.mem.queue) < d.opts.MemTableStopWritesThreshold) &&
!d.opts.Experimental.DisableIngestAsFlushable() && !hasRemoteFiles

if !canIngestFlushable || (exciseSpan.Valid() && !sstsContainExciseTombstone) {
// We're not able to ingest as a flushable,
// so we must synchronously flush.
//
Expand Down Expand Up @@ -1657,7 +1673,7 @@ func (d *DB) ingest(
for i := range fileMetas {
fileMetas[i] = loadResult.local[i].fileMetadata
}
err = d.handleIngestAsFlushable(fileMetas, seqNum)
err = d.handleIngestAsFlushable(fileMetas, seqNum, exciseSpan)
}

var ve *versionEdit
Expand Down
15 changes: 11 additions & 4 deletions ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,9 @@ func TestExcise(t *testing.T) {

case "ingest-and-excise":
flushed = false
d.mu.Lock()
prevFlushableIngests := d.mu.versions.metrics.Flush.AsIngestCount
d.mu.Unlock()
if err := runIngestAndExciseCmd(td, d, mem); err != nil {
return err.Error()
}
Expand All @@ -704,8 +707,12 @@ func TestExcise(t *testing.T) {
for d.mu.compact.flushing {
d.mu.compact.cond.Wait()
}
flushableIngests := d.mu.versions.metrics.Flush.AsIngestCount
d.mu.Unlock()
if flushed {
if prevFlushableIngests < flushableIngests {
return "flushable ingest"
}
return "memtable flushed"
}
return ""
Expand Down Expand Up @@ -1091,7 +1098,7 @@ func testIngestSharedImpl(
require.NoError(t, err)
require.NoError(t, w.Close())

_, err = to.IngestAndExcise([]string{sstPath}, sharedSSTs, nil /* external */, KeyRange{Start: startKey, End: endKey})
_, err = to.IngestAndExcise([]string{sstPath}, sharedSSTs, nil /* external */, KeyRange{Start: startKey, End: endKey}, false)
require.NoError(t, err)
return fmt.Sprintf("replicated %d shared SSTs", len(sharedSSTs))

Expand Down Expand Up @@ -1320,7 +1327,7 @@ func TestSimpleIngestShared(t *testing.T) {
Level: 6,
Size: uint64(size + 5),
}
_, err = d.IngestAndExcise([]string{}, []SharedSSTMeta{sharedSSTMeta}, nil /* external */, KeyRange{Start: []byte("d"), End: []byte("ee")})
_, err = d.IngestAndExcise([]string{}, []SharedSSTMeta{sharedSSTMeta}, nil /* external */, KeyRange{Start: []byte("d"), End: []byte("ee")}, false)
require.NoError(t, err)

// TODO(bilal): Once reading of shared sstables is in, verify that the values
Expand Down Expand Up @@ -1582,7 +1589,7 @@ func TestConcurrentExcise(t *testing.T) {
require.NoError(t, err)
require.NoError(t, w.Close())

_, err = to.IngestAndExcise([]string{sstPath}, sharedSSTs, nil /* external */, KeyRange{Start: startKey, End: endKey})
_, err = to.IngestAndExcise([]string{sstPath}, sharedSSTs, nil, KeyRange{Start: startKey, End: endKey}, false)
require.NoError(t, err)
return fmt.Sprintf("replicated %d shared SSTs", len(sharedSSTs))

Expand Down Expand Up @@ -1991,7 +1998,7 @@ func TestIngestExternal(t *testing.T) {
)
require.NoError(t, err)
require.NoError(t, w.Close())
_, err = to.IngestAndExcise([]string{sstPath}, nil /* sharedSSTs */, externalFiles, KeyRange{Start: startKey, End: endKey})
_, err = to.IngestAndExcise([]string{sstPath}, nil /* shared */, externalFiles, KeyRange{Start: startKey, End: endKey}, false)
require.NoError(t, err)
return fmt.Sprintf("replicated %d external SSTs", len(externalFiles))

Expand Down
6 changes: 3 additions & 3 deletions metamorphic/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -923,10 +923,10 @@ func (o *ingestAndExciseOp) run(t *Test, h historyRecorder) {

if t.testOpts.useExcise {
err = firstError(err, t.withRetries(func() error {
_, err := t.getDB(o.dbID).IngestAndExcise([]string{path}, nil /* sharedSSTs */, nil /* external */, pebble.KeyRange{
_, err := t.getDB(o.dbID).IngestAndExcise([]string{path}, nil /* shared */, nil /* external */, pebble.KeyRange{
Start: o.exciseStart,
End: o.exciseEnd,
})
}, false)
return err
}))
} else {
Expand Down Expand Up @@ -1988,7 +1988,7 @@ func (r *replicateOp) runSharedReplicate(
return
}

_, err = dest.IngestAndExcise([]string{sstPath}, sharedSSTs, nil /* external */, pebble.KeyRange{Start: r.start, End: r.end})
_, err = dest.IngestAndExcise([]string{sstPath}, sharedSSTs, nil /* external */, pebble.KeyRange{Start: r.start, End: r.end}, false)
h.Recordf("%s // %v", r, err)
}

Expand Down
4 changes: 1 addition & 3 deletions open.go
Original file line number Diff line number Diff line change
Expand Up @@ -936,9 +936,7 @@ func (d *DB) replayWAL(
panic("pebble: couldn't load all files in WAL entry.")
}

entry, err = d.newIngestedFlushableEntry(
meta, seqNum, base.DiskFileNum(ll.Num),
)
entry, err = d.newIngestedFlushableEntry(meta, seqNum, base.DiskFileNum(ll.Num), KeyRange{})
if err != nil {
return nil, 0, err
}
Expand Down
Loading

0 comments on commit fbfa1d2

Please sign in to comment.