Skip to content

Commit

Permalink
ingest,compaction: use excise when flushing flushableIngest
Browse files Browse the repository at this point in the history
This patch amends the ingest pipeline to allow for certain cases to use
`flushaleIngest` in order to avoid waiting on memtable flushes when
there is overlap. The current use for this is range snapshots in
Cockroach where we include `RANGEDEL`. When we use this path, we also
pass the `exciseSpan` down down into the flushable to then excise at
flush time to still benefit from the use of excise in the ingest path.

Fixes cockroachdb#3335.
  • Loading branch information
aadityasondhi committed Mar 20, 2024
1 parent 8df4320 commit f3cc528
Show file tree
Hide file tree
Showing 9 changed files with 227 additions and 53 deletions.
82 changes: 58 additions & 24 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -1447,17 +1447,67 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
var err error
var fileToSplit *fileMetadata
var ingestSplitFiles []ingestSplitFile
for _, file := range c.flushing[0].flushable.(*ingestedFlushable).files {
suggestSplit := d.opts.Experimental.IngestSplit != nil && d.opts.Experimental.IngestSplit() &&
d.FormatMajorVersion() >= FormatVirtualSSTables
level, fileToSplit, err = ingestTargetLevel(
d.newIters, d.tableNewRangeKeyIter, iterOpts, d.opts.Comparer,
c.version, baseLevel, d.mu.compact.inProgress, file.FileMetadata,
suggestSplit,
)
ingestFlushable := c.flushing[0].flushable.(*ingestedFlushable)

updateLevelMetricsOnExcise := func(m *fileMetadata, level int, added []newFileEntry) {
levelMetrics := c.metrics[level]
if levelMetrics == nil {
levelMetrics = &LevelMetrics{}
c.metrics[level] = levelMetrics
}
levelMetrics.NumFiles--
levelMetrics.Size -= int64(m.Size)
for i := range added {
levelMetrics.NumFiles++
levelMetrics.Size += int64(added[i].Meta.Size)
}
}

suggestSplit := d.opts.Experimental.IngestSplit != nil && d.opts.Experimental.IngestSplit() &&
d.FormatMajorVersion() >= FormatVirtualSSTables

replacedFiles := make(map[base.FileNum][]newFileEntry)
for _, file := range ingestFlushable.files {
if ingestFlushable.exciseSpan.Valid() && ingestFlushable.exciseSpan.Contains(d.cmp, file.FileMetadata.Smallest) && ingestFlushable.exciseSpan.Contains(d.cmp, file.FileMetadata.Largest) {
level = 6
} else {
level, fileToSplit, err = ingestTargetLevel(
d.newIters, d.tableNewRangeKeyIter, iterOpts, d.opts.Comparer,
c.version, baseLevel, d.mu.compact.inProgress, file.FileMetadata,
suggestSplit,
)
}

if ingestFlushable.exciseSpan.Valid() {
ve.DeletedFiles = map[manifest.DeletedFileEntry]*manifest.FileMetadata{}
// Iterate through all levels and find files that intersect with exciseSpan.
for level = range c.version.Levels {
overlaps := c.version.Overlaps(level, ingestFlushable.exciseSpan.Start, ingestFlushable.exciseSpan.End, true /* exclusiveEnd */)
iter := overlaps.Iter()

for m := iter.First(); m != nil; m = iter.Next() {
newFiles, err := d.excise(ingestFlushable.exciseSpan, m, ve, level)
if err != nil {
return nil, err
}

if _, ok := ve.DeletedFiles[deletedFileEntry{
Level: level,
FileNum: m.FileNum,
}]; !ok {
// We did not excise this file.
continue
}
replacedFiles[m.FileNum] = newFiles
updateLevelMetricsOnExcise(m, level, newFiles)
}
}
}
if err != nil {
return nil, err
}

// Add the current flushableIngest file to the version.
ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: level, Meta: file.FileMetadata})
if fileToSplit != nil {
ingestSplitFiles = append(ingestSplitFiles, ingestSplitFile{
Expand All @@ -1475,23 +1525,7 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
levelMetrics.TablesIngested++
}

updateLevelMetricsOnExcise := func(m *fileMetadata, level int, added []newFileEntry) {
levelMetrics := c.metrics[level]
if levelMetrics == nil {
levelMetrics = &LevelMetrics{}
c.metrics[level] = levelMetrics
}
levelMetrics.NumFiles--
levelMetrics.Size -= int64(m.Size)
for i := range added {
levelMetrics.NumFiles++
levelMetrics.Size += int64(added[i].Meta.Size)
}
}

if len(ingestSplitFiles) > 0 {
ve.DeletedFiles = make(map[manifest.DeletedFileEntry]*manifest.FileMetadata)
replacedFiles := make(map[base.FileNum][]newFileEntry)
if err := d.ingestSplit(ve, updateLevelMetricsOnExcise, ingestSplitFiles, replacedFiles); err != nil {
return nil, err
}
Expand Down
5 changes: 4 additions & 1 deletion data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1250,6 +1250,7 @@ func (d *DB) waitTableStats() {

func runIngestAndExciseCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error {
var exciseSpan KeyRange
var doFlushableIngest bool
paths := make([]string, 0, len(td.CmdArgs))
for i, arg := range td.CmdArgs {
switch td.CmdArgs[i].Key {
Expand All @@ -1263,12 +1264,14 @@ func runIngestAndExciseCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error {
}
exciseSpan.Start = []byte(fields[0])
exciseSpan.End = []byte(fields[1])
case "flushable-ingest":
doFlushableIngest = true
default:
paths = append(paths, arg.String())
}
}

if _, err := d.IngestAndExcise(paths, nil /* shared */, nil /* external */, exciseSpan); err != nil {
if _, err := d.IngestAndExcise(paths, nil, nil, exciseSpan, doFlushableIngest); err != nil {
return err
}
return nil
Expand Down
5 changes: 5 additions & 0 deletions flushable.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,17 @@ type ingestedFlushable struct {
slice manifest.LevelSlice
// hasRangeKeys is set on ingestedFlushable construction.
hasRangeKeys bool
// exciseSpan is populated if an excise operation should be performed during
// flush.
exciseSpan KeyRange
}

func newIngestedFlushable(
files []*fileMetadata,
comparer *Comparer,
newIters tableNewIters,
newRangeKeyIters keyspanimpl.TableNewSpanIter,
exciseSpan KeyRange,
) *ingestedFlushable {
var physicalFiles []physicalMeta
var hasRangeKeys bool
Expand All @@ -183,6 +187,7 @@ func newIngestedFlushable(
// slice is immutable and can be set once and used many times.
slice: manifest.NewLevelSliceKeySorted(comparer.Compare, files),
hasRangeKeys: hasRangeKeys,
exciseSpan: exciseSpan,
}

return ret
Expand Down
4 changes: 1 addition & 3 deletions flushable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,7 @@ func TestIngestedSSTFlushableAPI(t *testing.T) {
}

meta := loadFileMeta(paths)
flushable = newIngestedFlushable(
meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter,
)
flushable = newIngestedFlushable(meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter, KeyRange{})
return ""
case "iter":
iter := flushable.newIter(nil)
Expand Down
47 changes: 32 additions & 15 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -1101,7 +1101,7 @@ func (d *DB) Ingest(paths []string) error {
if d.opts.ReadOnly {
return ErrReadOnly
}
_, err := d.ingest(paths, ingestTargetLevel, nil /* shared */, KeyRange{}, nil /* external */)
_, err := d.ingest(paths, ingestTargetLevel, nil, KeyRange{}, false, nil)
return err
}

Expand Down Expand Up @@ -1188,7 +1188,7 @@ func (d *DB) IngestWithStats(paths []string) (IngestOperationStats, error) {
if d.opts.ReadOnly {
return IngestOperationStats{}, ErrReadOnly
}
return d.ingest(paths, ingestTargetLevel, nil /* shared */, KeyRange{}, nil /* external */)
return d.ingest(paths, ingestTargetLevel, nil, KeyRange{}, false, nil)
}

// IngestExternalFiles does the same as IngestWithStats, and additionally
Expand All @@ -1206,7 +1206,7 @@ func (d *DB) IngestExternalFiles(external []ExternalFile) (IngestOperationStats,
if d.opts.Experimental.RemoteStorage == nil {
return IngestOperationStats{}, errors.New("pebble: cannot ingest external files without shared storage configured")
}
return d.ingest(nil, ingestTargetLevel, nil /* shared */, KeyRange{}, external)
return d.ingest(nil, ingestTargetLevel, nil, KeyRange{}, false, external)
}

// IngestAndExcise does the same as IngestWithStats, and additionally accepts a
Expand All @@ -1220,7 +1220,11 @@ func (d *DB) IngestExternalFiles(external []ExternalFile) (IngestOperationStats,
// Panics if this DB instance was not instantiated with a remote.Storage and
// shared sstables are present.
func (d *DB) IngestAndExcise(
paths []string, shared []SharedSSTMeta, external []ExternalFile, exciseSpan KeyRange,
paths []string,
shared []SharedSSTMeta,
external []ExternalFile,
exciseSpan KeyRange,
doFlushableIngest bool,
) (IngestOperationStats, error) {
if err := d.closed.Load(); err != nil {
panic(err)
Expand All @@ -1243,12 +1247,12 @@ func (d *DB) IngestAndExcise(
v, FormatMinForSharedObjects,
)
}
return d.ingest(paths, ingestTargetLevel, shared, exciseSpan, external)
return d.ingest(paths, ingestTargetLevel, shared, exciseSpan, doFlushableIngest, external)
}

// Both DB.mu and commitPipeline.mu must be held while this is called.
func (d *DB) newIngestedFlushableEntry(
meta []*fileMetadata, seqNum uint64, logNum base.DiskFileNum,
meta []*fileMetadata, seqNum uint64, logNum base.DiskFileNum, exciseSpan KeyRange,
) (*flushableEntry, error) {
// Update the sequence number for all of the sstables in the
// metadata. Writing the metadata to the manifest when the
Expand All @@ -1264,7 +1268,7 @@ func (d *DB) newIngestedFlushableEntry(
}
}

f := newIngestedFlushable(meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter)
f := newIngestedFlushable(meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter, exciseSpan)

// NB: The logNum/seqNum are the WAL number which we're writing this entry
// to and the sequence number within the WAL which we'll write this entry
Expand Down Expand Up @@ -1294,7 +1298,9 @@ func (d *DB) newIngestedFlushableEntry(
// we're holding both locks, the order in which we rotate the memtable or
// recycle the WAL in this function is irrelevant as long as the correct log
// numbers are assigned to the appropriate flushable.
func (d *DB) handleIngestAsFlushable(meta []*fileMetadata, seqNum uint64) error {
func (d *DB) handleIngestAsFlushable(
meta []*fileMetadata, seqNum uint64, exciseSpan KeyRange,
) error {
b := d.NewBatch()
for _, m := range meta {
b.ingestSST(m.FileNum)
Expand All @@ -1320,7 +1326,7 @@ func (d *DB) handleIngestAsFlushable(meta []*fileMetadata, seqNum uint64) error
d.mu.Lock()
}

entry, err := d.newIngestedFlushableEntry(meta, seqNum, logNum)
entry, err := d.newIngestedFlushableEntry(meta, seqNum, logNum, exciseSpan)
if err != nil {
return err
}
Expand All @@ -1340,6 +1346,7 @@ func (d *DB) handleIngestAsFlushable(meta []*fileMetadata, seqNum uint64) error
}
}

d.mu.versions.metrics.Ingest.Count++
currMem := d.mu.mem.mutable
// NB: Placing ingested sstables above the current memtables
// requires rotating of the existing memtables/WAL. There is
Expand All @@ -1349,6 +1356,7 @@ func (d *DB) handleIngestAsFlushable(meta []*fileMetadata, seqNum uint64) error
d.mu.mem.queue = append(d.mu.mem.queue, entry)
d.rotateMemtable(newLogNum, nextSeqNum, currMem)
d.updateReadStateLocked(d.opts.DebugCheck)
// TODO(aaditya): is this necessary? we call this already in rotateMemtable above
d.maybeScheduleFlush()
return nil
}
Expand All @@ -1359,6 +1367,7 @@ func (d *DB) ingest(
targetLevelFunc ingestTargetLevelFunc,
shared []SharedSSTMeta,
exciseSpan KeyRange,
doFlushableIngest bool,
external []ExternalFile,
) (IngestOperationStats, error) {
if len(shared) > 0 && d.opts.Experimental.RemoteStorage == nil {
Expand Down Expand Up @@ -1553,11 +1562,19 @@ func (d *DB) ingest(
mut.writerRef()
return
}
// The ingestion overlaps with some entry in the flushable queue.
if d.FormatMajorVersion() < FormatFlushableIngest ||
d.opts.Experimental.DisableIngestAsFlushable() ||
len(shared) > 0 || exciseSpan.Valid() || len(external) > 0 ||
(len(d.mu.mem.queue) > d.opts.MemTableStopWritesThreshold-1) {

// The ingestion overlaps with some entry in the flushable queue. If the
// pre-conditions are met below, we can treat this ingestion as a flushable
// ingest, otherwise we wait on the memtable flush before ingestion.
//
// TODO(aaditya): We should make flushableIngest compatible with remote
// files.
hasRemoteFiles := len(shared) > 0 || len(external) > 0
canIngestFlushable := d.FormatMajorVersion() >= FormatFlushableIngest &&
(len(d.mu.mem.queue) < d.opts.MemTableStopWritesThreshold) &&
!d.opts.Experimental.DisableIngestAsFlushable() && !hasRemoteFiles

if !canIngestFlushable || (exciseSpan.Valid() && !doFlushableIngest) {
// We're not able to ingest as a flushable,
// so we must synchronously flush.
//
Expand Down Expand Up @@ -1589,7 +1606,7 @@ func (d *DB) ingest(
for i := range fileMetas {
fileMetas[i] = loadResult.local[i].fileMetadata
}
err = d.handleIngestAsFlushable(fileMetas, seqNum)
err = d.handleIngestAsFlushable(fileMetas, seqNum, exciseSpan)
}

var ve *versionEdit
Expand Down
15 changes: 11 additions & 4 deletions ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,9 @@ func TestExcise(t *testing.T) {

case "ingest-and-excise":
flushed = false
d.mu.Lock()
prevFlushableIngests := d.mu.versions.metrics.Flush.AsIngestCount
d.mu.Unlock()
if err := runIngestAndExciseCmd(td, d, mem); err != nil {
return err.Error()
}
Expand All @@ -704,8 +707,12 @@ func TestExcise(t *testing.T) {
for d.mu.compact.flushing {
d.mu.compact.cond.Wait()
}
flushableIngests := d.mu.versions.metrics.Flush.AsIngestCount
d.mu.Unlock()
if flushed {
if prevFlushableIngests < flushableIngests {
return "flushable ingest"
}
return "memtable flushed"
}
return ""
Expand Down Expand Up @@ -1090,7 +1097,7 @@ func testIngestSharedImpl(
require.NoError(t, err)
require.NoError(t, w.Close())

_, err = to.IngestAndExcise([]string{sstPath}, sharedSSTs, nil /* external */, KeyRange{Start: startKey, End: endKey})
_, err = to.IngestAndExcise([]string{sstPath}, sharedSSTs, nil, KeyRange{Start: startKey, End: endKey}, false)
require.NoError(t, err)
return fmt.Sprintf("replicated %d shared SSTs", len(sharedSSTs))

Expand Down Expand Up @@ -1318,7 +1325,7 @@ func TestSimpleIngestShared(t *testing.T) {
Level: 6,
Size: uint64(size + 5),
}
_, err = d.IngestAndExcise([]string{}, []SharedSSTMeta{sharedSSTMeta}, nil /* external */, KeyRange{Start: []byte("d"), End: []byte("ee")})
_, err = d.IngestAndExcise([]string{}, []SharedSSTMeta{sharedSSTMeta}, nil, KeyRange{Start: []byte("d"), End: []byte("ee")}, false)
require.NoError(t, err)

// TODO(bilal): Once reading of shared sstables is in, verify that the values
Expand Down Expand Up @@ -1579,7 +1586,7 @@ func TestConcurrentExcise(t *testing.T) {
require.NoError(t, err)
require.NoError(t, w.Close())

_, err = to.IngestAndExcise([]string{sstPath}, sharedSSTs, nil /* external */, KeyRange{Start: startKey, End: endKey})
_, err = to.IngestAndExcise([]string{sstPath}, sharedSSTs, nil, KeyRange{Start: startKey, End: endKey}, false)
require.NoError(t, err)
return fmt.Sprintf("replicated %d shared SSTs", len(sharedSSTs))

Expand Down Expand Up @@ -1980,7 +1987,7 @@ func TestIngestExternal(t *testing.T) {
)
require.NoError(t, err)
require.NoError(t, w.Close())
_, err = to.IngestAndExcise([]string{sstPath}, nil /* sharedSSTs */, externalFiles, KeyRange{Start: startKey, End: endKey})
_, err = to.IngestAndExcise([]string{sstPath}, nil, externalFiles, KeyRange{Start: startKey, End: endKey}, false)
require.NoError(t, err)
return fmt.Sprintf("replicated %d external SSTs", len(externalFiles))

Expand Down
6 changes: 3 additions & 3 deletions metamorphic/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -875,10 +875,10 @@ func (o *ingestAndExciseOp) run(t *Test, h historyRecorder) {

if t.testOpts.useExcise {
err = firstError(err, t.withRetries(func() error {
_, err := t.getDB(o.dbID).IngestAndExcise([]string{path}, nil /* sharedSSTs */, nil /* external */, pebble.KeyRange{
_, err := t.getDB(o.dbID).IngestAndExcise([]string{path}, nil, nil, pebble.KeyRange{
Start: o.exciseStart,
End: o.exciseEnd,
})
}, false)
return err
}))
} else {
Expand Down Expand Up @@ -1945,7 +1945,7 @@ func (r *replicateOp) runSharedReplicate(
return
}

_, err = dest.IngestAndExcise([]string{sstPath}, sharedSSTs, nil /* external */, pebble.KeyRange{Start: r.start, End: r.end})
_, err = dest.IngestAndExcise([]string{sstPath}, sharedSSTs, nil, pebble.KeyRange{Start: r.start, End: r.end}, false)
h.Recordf("%s // %v", r, err)
}

Expand Down
4 changes: 1 addition & 3 deletions open.go
Original file line number Diff line number Diff line change
Expand Up @@ -933,9 +933,7 @@ func (d *DB) replayWAL(
panic("pebble: couldn't load all files in WAL entry.")
}

entry, err = d.newIngestedFlushableEntry(
meta, seqNum, base.DiskFileNum(ll.Num),
)
entry, err = d.newIngestedFlushableEntry(meta, seqNum, base.DiskFileNum(ll.Num), KeyRange{})
if err != nil {
return nil, 0, err
}
Expand Down
Loading

0 comments on commit f3cc528

Please sign in to comment.