Skip to content

Commit

Permalink
db: add ingest-time splitting of ssts into virtual ones
Browse files Browse the repository at this point in the history
Currently, if we identify boundary overlap in a level
during ingest target level calculation, but no data overlap,
we are forced to find a target level above the file we saw
the overlap with (if we can't fall below it, such as if the
existing file is in L6, which happens commonly).

This change takes advantage of virtual sstables to split
existing sstables into two virtual sstables when an ingested
sstable would be able to go into the same level had the sstables
been split that way to begin with. Doing this split reduces a
lot of write-amp as it avoids us from having to compact the
newly-ingested sstable with the sstable it boundary-overlapped with.

Biggest part of #1683. First commit is #2538, which this shares
a lot of logic with (mostly just the excise function).
  • Loading branch information
itsbilal committed Aug 10, 2023
1 parent 58a05cd commit d5eaf3c
Show file tree
Hide file tree
Showing 7 changed files with 506 additions and 33 deletions.
54 changes: 53 additions & 1 deletion compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -1880,15 +1880,27 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
ve := &versionEdit{}
var level int
var err error
var fileToSplit *fileMetadata
var ingestSplitFiles []ingestSplitFile
for _, file := range c.flushing[0].flushable.(*ingestedFlushable).files {
level, err = ingestTargetLevel(
suggestSplit := d.opts.Experimental.IngestSplit != nil && d.opts.Experimental.IngestSplit() &&
d.FormatMajorVersion() >= ExperimentalFormatVirtualSSTables
level, fileToSplit, err = ingestTargetLevel(
d.newIters, d.tableNewRangeKeyIter, iterOpts, d.cmp,
c.version, baseLevel, d.mu.compact.inProgress, file.FileMetadata,
suggestSplit,
)
if err != nil {
return nil, err
}
ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: level, Meta: file.FileMetadata})
if fileToSplit != nil {
ingestSplitFiles = append(ingestSplitFiles, ingestSplitFile{
ingestFile: file.FileMetadata,
splitFile: fileToSplit,
level: level,
})
}
levelMetrics := c.metrics[level]
if levelMetrics == nil {
levelMetrics = &LevelMetrics{}
Expand All @@ -1900,6 +1912,28 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
levelMetrics.TablesIngested++
}

updateLevelMetricsOnExcise := func(m *fileMetadata, level int, added []newFileEntry) {
levelMetrics := c.metrics[level]
if levelMetrics == nil {
levelMetrics = &LevelMetrics{}
c.metrics[level] = levelMetrics
}
levelMetrics.NumFiles--
levelMetrics.Size -= int64(m.Size)
for i := range added {
levelMetrics.NumFiles++
levelMetrics.Size += int64(added[i].Meta.Size)
}
}

if len(ingestSplitFiles) > 0 {
ve.DeletedFiles = make(map[manifest.DeletedFileEntry]*manifest.FileMetadata)
replacedFiles := make(map[base.FileNum][]newFileEntry)
if err := d.ingestSplit(ve, updateLevelMetricsOnExcise, ingestSplitFiles, KeyRange{}, replacedFiles); err != nil {
return nil, err
}
}

return ve, nil
}

Expand Down Expand Up @@ -2069,6 +2103,24 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
metrics.BytesIn += d.mu.mem.queue[i].logSize
}
}
} else if len(ve.DeletedFiles) > 0 {
// c.kind == compactionKindIngestedFlushable && we have deleted files due
// to ingest-time splits.
//
// Iterate through all other compactions, and check if their inputs have
// been replaced due to an ingest-time split. In that case, cancel the
// compaction.
for c2 := range d.mu.compact.inProgress {
for i := range c2.inputs {
iter := c2.inputs[i].files.Iter()
for f := iter.First(); f != nil; f = iter.Next() {
if _, ok := ve.DeletedFiles[deletedFileEntry{FileNum: f.FileNum, Level: c2.inputs[i].level}]; ok {
c2.cancel.Store(true)
break
}
}
}
}
}
err = d.mu.versions.logAndApply(jobID, ve, c.metrics, false, /* forceRotation */
func() []compactionInfo { return d.getInProgressCompactionInfoLocked(c) })
Expand Down
5 changes: 3 additions & 2 deletions data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1354,8 +1354,9 @@ func runForceIngestCmd(td *datadriven.TestData, d *DB) error {
int,
map[*compaction]struct{},
*fileMetadata,
) (int, error) {
return level, nil
bool,
) (int, *fileMetadata, error) {
return level, nil, nil
}, nil /* shared */, KeyRange{}, nil /* external */)
return err
}
Expand Down
Loading

0 comments on commit d5eaf3c

Please sign in to comment.