Skip to content

Commit

Permalink
allow replication of external files
Browse files Browse the repository at this point in the history
This change contains three changes:

1. The ability to iterate over external files.

2. Allowing a mix of local and remote files at the _first_ level where
   remote files are present when iterating.

3. Supporting importing External files in the same way that we do for
   SharedSSTs.

Like the sharedFileVisitor, the new externalFileVisitor allows the
caller to process external files directly.

In this PR, the externalFileVisitor only applies to L6 and L6 must be
completely external or shared files.
  • Loading branch information
stevendanna committed Mar 6, 2024
1 parent f3e3d4a commit 9591f93
Show file tree
Hide file tree
Showing 12 changed files with 701 additions and 102 deletions.
2 changes: 1 addition & 1 deletion data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1268,7 +1268,7 @@ func runIngestAndExciseCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error {
}
}

if _, err := d.IngestAndExcise(paths, nil /* shared */, exciseSpan); err != nil {
if _, err := d.IngestAndExcise(paths, nil /* shared */, nil /* external */, exciseSpan); err != nil {
return err
}
return nil
Expand Down
13 changes: 7 additions & 6 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1234,14 +1234,15 @@ func (d *DB) ScanInternal(
visitRangeDel func(start, end []byte, seqNum uint64) error,
visitRangeKey func(start, end []byte, keys []rangekey.Key) error,
visitSharedFile func(sst *SharedSSTMeta) error,
visitExternalFile func(sst *ExternalFile) error,
) error {
scanInternalOpts := &scanInternalOptions{
CategoryAndQoS: categoryAndQoS,
visitPointKey: visitPointKey,
visitRangeDel: visitRangeDel,
visitRangeKey: visitRangeKey,
visitSharedFile: visitSharedFile,
skipSharedLevels: visitSharedFile != nil,
CategoryAndQoS: categoryAndQoS,
visitPointKey: visitPointKey,
visitRangeDel: visitRangeDel,
visitRangeKey: visitRangeKey,
visitSharedFile: visitSharedFile,
visitExternalFile: visitExternalFile,
IterOptions: IterOptions{
KeyTypes: IterKeyTypePointsAndRanges,
LowerBound: lower,
Expand Down
88 changes: 72 additions & 16 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package pebble

import (
"context"
"fmt"
"slices"
"sort"
"time"
Expand Down Expand Up @@ -394,6 +395,8 @@ type ingestLoadResult struct {
local []ingestLocalMeta
shared []ingestSharedMeta
external []ingestExternalMeta

externalFilesHaveLevel bool
}

type ingestLocalMeta struct {
Expand Down Expand Up @@ -481,6 +484,14 @@ func ingestLoad(
fileMetadata: m,
external: external[i],
})
if external[i].Level > 0 {
if i != 0 && !result.externalFilesHaveLevel {
return ingestLoadResult{}, errors.AssertionFailedf("pebble: external sstables must all have level set or unset")
}
result.externalFilesHaveLevel = true
} else if result.externalFilesHaveLevel {
return ingestLoadResult{}, errors.AssertionFailedf("pebble: external sstables must all have level set or unset")
}
}
return result, nil
}
Expand All @@ -493,12 +504,22 @@ func ingestSortAndVerify(cmp Compare, lr ingestLoadResult, exciseSpan KeyRange)
return errors.Newf("pebble: shared file outside of excise span, span [%s-%s), file = %s", exciseSpan.Start, exciseSpan.End, f.String())
}
}

if lr.externalFilesHaveLevel {
for _, f := range lr.external {
if !exciseSpan.Contains(cmp, f.Smallest) || !exciseSpan.Contains(cmp, f.Largest) {
return errors.AssertionFailedf("pebble: external file outside of excise span, span [%s-%s), file = %s", exciseSpan.Start, exciseSpan.End, f.String())
}
}
}

if len(lr.external) > 0 {
if len(lr.local) > 0 || len(lr.shared) > 0 {
// Currently we only support external ingests on their own. If external
// files are present alongside local/shared files, return an error.
return errors.Newf("pebble: external files cannot be ingested atomically alongside other types of files")
if len(lr.shared) > 0 {
// If external files are present alongside shared files,
// return an error.
return errors.AssertionFailedf("pebble: external files cannot be ingested atomically alongside shared files")
}

// Sort according to the smallest key.
slices.SortFunc(lr.external, func(a, b ingestExternalMeta) int {
return cmp(a.Smallest.UserKey, b.Smallest.UserKey)
Expand Down Expand Up @@ -535,6 +556,11 @@ func ingestSortAndVerify(cmp Compare, lr ingestLoadResult, exciseSpan KeyRange)
filesInLevel = append(filesInLevel, lr.shared[i].fileMetadata)
}
}
for i := range lr.external {
if lr.external[i].external.Level == uint8(l) {
filesInLevel = append(filesInLevel, lr.external[i].fileMetadata)
}
}
slices.SortFunc(filesInLevel, func(a, b *fileMetadata) int {
return cmp(a.Smallest.UserKey, b.Smallest.UserKey)
})
Expand Down Expand Up @@ -697,14 +723,14 @@ func ingestUpdateSeqNum(
}
seqNum++
}
for i := range loadResult.local {
if err := setSeqNumInMetadata(loadResult.local[i].fileMetadata, seqNum, cmp, format); err != nil {
for i := range loadResult.external {
if err := setSeqNumInMetadata(loadResult.external[i].fileMetadata, seqNum, cmp, format); err != nil {
return err
}
seqNum++
}
for i := range loadResult.external {
if err := setSeqNumInMetadata(loadResult.external[i].fileMetadata, seqNum, cmp, format); err != nil {
for i := range loadResult.local {
if err := setSeqNumInMetadata(loadResult.local[i].fileMetadata, seqNum, cmp, format); err != nil {
return err
}
seqNum++
Expand Down Expand Up @@ -1146,6 +1172,28 @@ type ExternalFile struct {
// backing sst (the entire sst, not just the part restricted to Bounds).
// - the backing sst must not contain multiple keys with the same prefix.
SyntheticSuffix []byte

// Level denotes the level at which this file was presetnt at read time
// if the external file was returned by a scan of an existing Pebble
// instance. If Level is 0, this field is ignored.
Level uint8
}

func (e *ExternalFile) cloneFromFileMeta(f *fileMetadata) {
*e = ExternalFile{
Bounds: KeyRange{
Start: append([]byte(nil), f.Smallest.UserKey...),
End: append([]byte(nil), f.Largest.UserKey...),
},
HasPointKey: f.HasPointKeys,
HasRangeKey: f.HasRangeKeys,
Size: f.Size,
}
e.SyntheticSuffix = append([]byte(nil), f.SyntheticSuffix...)
if pr := f.PrefixReplacement; pr != nil {
e.ContentPrefix = append([]byte(nil), pr.ContentPrefix...)
e.SyntheticPrefix = append([]byte(nil), pr.SyntheticPrefix...)
}
}

// IngestWithStats does the same as Ingest, and additionally returns
Expand Down Expand Up @@ -1189,7 +1237,7 @@ func (d *DB) IngestExternalFiles(external []ExternalFile) (IngestOperationStats,
// Panics if this DB instance was not instantiated with a remote.Storage and
// shared sstables are present.
func (d *DB) IngestAndExcise(
paths []string, shared []SharedSSTMeta, exciseSpan KeyRange,
paths []string, shared []SharedSSTMeta, external []ExternalFile, exciseSpan KeyRange,
) (IngestOperationStats, error) {
if err := d.closed.Load(); err != nil {
panic(err)
Expand All @@ -1212,7 +1260,7 @@ func (d *DB) IngestAndExcise(
v, FormatMinForSharedObjects,
)
}
return d.ingest(paths, ingestTargetLevel, shared, exciseSpan, nil /* external */)
return d.ingest(paths, ingestTargetLevel, shared, exciseSpan, external)
}

// Both DB.mu and commitPipeline.mu must be held while this is called.
Expand Down Expand Up @@ -2169,7 +2217,8 @@ func (d *DB) ingestApply(
// overlap any existing files in the level.
var m *fileMetadata
sharedIdx := -1
sharedLevel := -1
externalIdx := -1
specifiedLevel := -1
externalFile := false
if i < len(lr.local) {
// local file.
Expand All @@ -2178,20 +2227,26 @@ func (d *DB) ingestApply(
// shared file.
sharedIdx = i - len(lr.local)
m = lr.shared[sharedIdx].fileMetadata
sharedLevel = int(lr.shared[sharedIdx].shared.Level)
specifiedLevel = int(lr.shared[sharedIdx].shared.Level)
} else {
// external file.
externalFile = true
m = lr.external[i-(len(lr.local)+len(lr.shared))].fileMetadata
externalIdx = i - (len(lr.local) + len(lr.shared))
m = lr.external[externalIdx].fileMetadata
specifiedLevel = int(lr.external[externalIdx].external.Level)
}
f := &ve.NewFiles[i]
var err error
if sharedIdx >= 0 {
f.Level = sharedLevel
f.Level = specifiedLevel
if f.Level < sharedLevelsStart {
panic("cannot slot a shared file higher than the highest shared level")
panic(fmt.Sprintf("cannot slot a shared file higher than the highest shared level: %d < %d",
f.Level, sharedLevelsStart))
}
ve.CreatedBackingTables = append(ve.CreatedBackingTables, m.FileBacking)
} else if externalFile && lr.externalFilesHaveLevel {
f.Level = specifiedLevel
ve.CreatedBackingTables = append(ve.CreatedBackingTables, m.FileBacking)
} else {
if externalFile {
ve.CreatedBackingTables = append(ve.CreatedBackingTables, m.FileBacking)
Expand All @@ -2200,7 +2255,7 @@ func (d *DB) ingestApply(
if exciseSpan.Valid() && exciseSpan.Contains(d.cmp, m.Smallest) && exciseSpan.Contains(d.cmp, m.Largest) {
// This file fits perfectly within the excise span. We can slot it at
// L6, or sharedLevelsStart - 1 if we have shared files.
if len(lr.shared) > 0 {
if len(lr.shared) > 0 || lr.externalFilesHaveLevel {
f.Level = sharedLevelsStart - 1
if baseLevel > f.Level {
f.Level = 0
Expand Down Expand Up @@ -2350,6 +2405,7 @@ func (d *DB) ingestApply(
}
}
}

if err := d.mu.versions.logAndApply(jobID, ve, metrics, false /* forceRotation */, func() []compactionInfo {
return d.getInProgressCompactionInfoLocked(nil)
}); err != nil {
Expand Down
Loading

0 comments on commit 9591f93

Please sign in to comment.