Skip to content

Commit

Permalink
ingest: use FileNum instead of DiskNum when loading tables
Browse files Browse the repository at this point in the history
We pass around pending `FileNum`s instead of `DiskFileNum`s, which
better matches the usage.
  • Loading branch information
RaduBerinde committed Jan 23, 2024
1 parent 5b09251 commit 5fec7ea
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 34 deletions.
4 changes: 2 additions & 2 deletions flushable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ func TestIngestedSSTFlushableAPI(t *testing.T) {

loadFileMeta := func(paths []string) []*fileMetadata {
d.mu.Lock()
pendingOutputs := make([]base.DiskFileNum, len(paths))
pendingOutputs := make([]base.FileNum, len(paths))
for i := range paths {
pendingOutputs[i] = d.mu.versions.getNextDiskFileNum()
pendingOutputs[i] = d.mu.versions.getNextFileNum()
}
jobID := d.mu.nextJobID
d.mu.nextJobID++
Expand Down
54 changes: 28 additions & 26 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func ingestValidateKey(opts *Options, key *InternalKey) error {
// ingestSynthesizeShared constructs a fileMetadata for one shared sstable owned
// or shared by another node.
func ingestSynthesizeShared(
opts *Options, sm SharedSSTMeta, fileNum base.DiskFileNum,
opts *Options, sm SharedSSTMeta, fileNum base.FileNum,
) (*fileMetadata, error) {
if sm.Size == 0 {
// Disallow 0 file sizes
Expand All @@ -108,14 +108,14 @@ func ingestSynthesizeShared(
// Don't load table stats. Doing a round trip to shared storage, one SST
// at a time is not worth it as it slows down ingestion.
meta := &fileMetadata{
// For simplicity, we use the same number for both the FileNum and the
// DiskFileNum (even though this is a virtual sstable).
FileNum: FileNum(fileNum),
FileNum: fileNum,
CreationTime: time.Now().Unix(),
Virtual: true,
Size: sm.Size,
}
meta.InitProviderBacking(fileNum)
// For simplicity, we use the same number for both the FileNum and the
// DiskFileNum (even though this is a virtual sstable).
meta.InitProviderBacking(base.DiskFileNum(fileNum))
// Set the underlying FileBacking's size to the same size as the virtualized
// view of the sstable. This ensures that we don't over-prioritize this
// sstable for compaction just yet, as we do not have a clear sense of what
Expand Down Expand Up @@ -166,11 +166,7 @@ func ingestSynthesizeShared(
// ingestLoad1External loads the fileMetadata for one external sstable.
// Sequence number and target level calculation happens during prepare/apply.
func ingestLoad1External(
opts *Options,
e ExternalFile,
fileNum base.DiskFileNum,
objProvider objstorage.Provider,
jobID int,
opts *Options, e ExternalFile, fileNum base.FileNum, objProvider objstorage.Provider, jobID int,
) (*fileMetadata, error) {
if e.Size == 0 {
// Disallow 0 file sizes
Expand All @@ -182,22 +178,22 @@ func ingestLoad1External(
// Don't load table stats. Doing a round trip to shared storage, one SST
// at a time is not worth it as it slows down ingestion.
meta := &fileMetadata{
// For simplicity, we use the same number for both the FileNum and the
// DiskFileNum (even though this is a virtual sstable).
FileNum: FileNum(fileNum),
FileNum: fileNum,
CreationTime: time.Now().Unix(),
Virtual: true,
Size: e.Size,
}
meta.InitProviderBacking(fileNum)
// For simplicity, we use the same number for both the FileNum and the
// DiskFileNum (even though this is a virtual sstable).
meta.InitProviderBacking(base.DiskFileNum(fileNum))

// Try to resolve a reference to the external file.
backing, err := objProvider.CreateExternalObjectBacking(e.Locator, e.ObjName)
if err != nil {
return nil, err
}
metas, err := objProvider.AttachRemoteObjects([]objstorage.RemoteObjectToAttach{{
FileNum: fileNum,
FileNum: meta.FileBacking.DiskFileNum,
FileType: fileTypeTable,
Backing: backing,
}})
Expand All @@ -209,7 +205,7 @@ func ingestLoad1External(
JobID: jobID,
Reason: "ingesting",
Path: objProvider.Path(metas[0]),
FileNum: fileNum,
FileNum: meta.FileBacking.DiskFileNum,
})
}
// In the name of keeping this ingestion as fast as possible, we avoid
Expand All @@ -220,12 +216,18 @@ func ingestLoad1External(
largestCopy := make([]byte, len(e.LargestUserKey))
copy(largestCopy, e.LargestUserKey)
if e.HasPointKey {
meta.ExtendPointKeyBounds(opts.Comparer.Compare, base.MakeInternalKey(smallestCopy, 0, InternalKeyKindMax),
base.MakeRangeDeleteSentinelKey(largestCopy))
meta.ExtendPointKeyBounds(
opts.Comparer.Compare,
base.MakeInternalKey(smallestCopy, 0, InternalKeyKindMax),
base.MakeRangeDeleteSentinelKey(largestCopy),
)
}
if e.HasRangeKey {
meta.ExtendRangeKeyBounds(opts.Comparer.Compare, base.MakeInternalKey(smallestCopy, 0, InternalKeyKindRangeKeySet),
base.MakeExclusiveSentinelKey(InternalKeyKindRangeKeyDelete, largestCopy))
meta.ExtendRangeKeyBounds(
opts.Comparer.Compare,
base.MakeInternalKey(smallestCopy, 0, InternalKeyKindRangeKeyMax),
base.MakeExclusiveSentinelKey(InternalKeyKindRangeKeyMin, largestCopy),
)
}

// Set the underlying FileBacking's size to the same size as the virtualized
Expand Down Expand Up @@ -254,9 +256,9 @@ func ingestLoad1(
fmv FormatMajorVersion,
readable objstorage.Readable,
cacheID uint64,
fileNum base.DiskFileNum,
fileNum base.FileNum,
) (*fileMetadata, error) {
cacheOpts := private.SSTableCacheOpts(cacheID, fileNum).(sstable.ReaderOption)
cacheOpts := private.SSTableCacheOpts(cacheID, base.PhysicalTableDiskFileNum(fileNum)).(sstable.ReaderOption)
r, err := sstable.NewReader(readable, opts.MakeReaderOptions(), cacheOpts)
if err != nil {
return nil, err
Expand All @@ -276,7 +278,7 @@ func ingestLoad1(
}

meta := &fileMetadata{}
meta.FileNum = base.PhysicalTableFileNum(fileNum)
meta.FileNum = fileNum
meta.Size = uint64(readable.Size())
meta.CreationTime = time.Now().Unix()
meta.InitPhysicalBacking()
Expand Down Expand Up @@ -407,7 +409,7 @@ func ingestLoad(
shared []SharedSSTMeta,
external []ExternalFile,
cacheID uint64,
pending []base.DiskFileNum,
pending []base.FileNum,
objProvider objstorage.Provider,
jobID int,
) (ingestLoadResult, error) {
Expand Down Expand Up @@ -1326,9 +1328,9 @@ func (d *DB) ingest(
// ordering. The sorting of L0 tables by sequence number avoids relying on
// that (busted) invariant.
d.mu.Lock()
pendingOutputs := make([]base.DiskFileNum, len(paths)+len(shared)+len(external))
pendingOutputs := make([]base.FileNum, len(paths)+len(shared)+len(external))
for i := 0; i < len(paths)+len(shared)+len(external); i++ {
pendingOutputs[i] = d.mu.versions.getNextDiskFileNum()
pendingOutputs[i] = d.mu.versions.getNextFileNum()
}

jobID := d.mu.nextJobID
Expand Down
10 changes: 5 additions & 5 deletions ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestIngestLoad(t *testing.T) {
Comparer: DefaultComparer,
FS: mem,
}).WithFSDefaults()
lr, err := ingestLoad(opts, dbVersion, []string{"ext"}, nil, nil, 0, []base.DiskFileNum{base.DiskFileNum(1)}, nil, 0)
lr, err := ingestLoad(opts, dbVersion, []string{"ext"}, nil, nil, 0, []base.FileNum{1}, nil, 0)
if err != nil {
return err.Error()
}
Expand Down Expand Up @@ -161,13 +161,13 @@ func TestIngestLoadRand(t *testing.T) {
}

paths := make([]string, 1+rng.Intn(10))
pending := make([]base.DiskFileNum, len(paths))
pending := make([]base.FileNum, len(paths))
expected := make([]*fileMetadata, len(paths))
for i := range paths {
paths[i] = fmt.Sprint(i)
pending[i] = base.DiskFileNum(rng.Uint64())
pending[i] = base.FileNum(rng.Uint64())
expected[i] = &fileMetadata{
FileNum: base.PhysicalTableFileNum(pending[i]),
FileNum: pending[i],
}
expected[i].StatsMarkValid()

Expand Down Expand Up @@ -235,7 +235,7 @@ func TestIngestLoadInvalid(t *testing.T) {
Comparer: DefaultComparer,
FS: mem,
}).WithFSDefaults()
if _, err := ingestLoad(opts, internalFormatNewest, []string{"invalid"}, nil, nil, 0, []base.DiskFileNum{base.DiskFileNum(1)}, nil, 0); err == nil {
if _, err := ingestLoad(opts, internalFormatNewest, []string{"invalid"}, nil, nil, 0, []base.FileNum{1}, nil, 0); err == nil {
t.Fatalf("expected error, but found success")
}
}
Expand Down
2 changes: 2 additions & 0 deletions internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ const (
InternalKeyKindRangeKeySet = base.InternalKeyKindRangeKeySet
InternalKeyKindRangeKeyUnset = base.InternalKeyKindRangeKeyUnset
InternalKeyKindRangeKeyDelete = base.InternalKeyKindRangeKeyDelete
InternalKeyKindRangeKeyMin = base.InternalKeyKindRangeKeyMin
InternalKeyKindRangeKeyMax = base.InternalKeyKindRangeKeyMax
InternalKeyKindIngestSST = base.InternalKeyKindIngestSST
InternalKeyKindDeleteSized = base.InternalKeyKindDeleteSized
InternalKeyKindInvalid = base.InternalKeyKindInvalid
Expand Down
3 changes: 3 additions & 0 deletions internal/base/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ const (
InternalKeyKindRangeKeyUnset InternalKeyKind = 20
InternalKeyKindRangeKeySet InternalKeyKind = 21

InternalKeyKindRangeKeyMin InternalKeyKind = InternalKeyKindRangeKeyDelete
InternalKeyKindRangeKeyMax InternalKeyKind = InternalKeyKindRangeKeySet

// InternalKeyKindIngestSST is used to distinguish a batch that corresponds to
// the WAL entry for ingested sstables that are added to the flushable
// queue. This InternalKeyKind cannot appear, amongst other key kinds in a
Expand Down
2 changes: 1 addition & 1 deletion open.go
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,7 @@ func (d *DB) replayWAL(
}
}
// NB: ingestLoad1 will close readable.
meta[i], err = ingestLoad1(d.opts, d.FormatMajorVersion(), readable, d.cacheID, n)
meta[i], err = ingestLoad1(d.opts, d.FormatMajorVersion(), readable, d.cacheID, base.PhysicalTableFileNum(n))
if err != nil {
return nil, 0, errors.Wrap(err, "pebble: error when loading flushable ingest files")
}
Expand Down

0 comments on commit 5fec7ea

Please sign in to comment.