Skip to content

Commit

Permalink
db: change flushable ingest key to encode just the FileNum
Browse files Browse the repository at this point in the history
The `InternalKeyKindIngestSST` key contains the file path of an SST
object. We don't normally store paths in metadata, as it is fairly
fragile (e.g. what if the store is moved).

Since this is always an SST object that we created (usually through
hard-link), we can store just the FileNum.

This also opens up the door for this object to be potentially stored
on shared storage (we'd need to support non-local SSTs as ingest
inputs though).
  • Loading branch information
RaduBerinde committed Feb 7, 2023
1 parent e35d9f5 commit 7d1e4ba
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 47 deletions.
12 changes: 7 additions & 5 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -828,9 +828,9 @@ func (b *Batch) LogData(data []byte, _ *WriteOptions) error {
return nil
}

// IngestSST adds an sstable path to the batch. The data will only be written to
// the WAL (not added to memtables or sstables).
func (b *Batch) ingestSST(data []byte) {
// IngestSST adds the FileNum for an sstable to the batch. The data will only be
// written to the WAL (not added to memtables or sstables).
func (b *Batch) ingestSST(fileNum base.FileNum) {
if b.Empty() {
b.ingestedSSTBatch = true
} else if !b.ingestedSSTBatch {
Expand All @@ -839,8 +839,10 @@ func (b *Batch) ingestSST(data []byte) {
}

origMemTableSize := b.memTableSize
b.prepareDeferredKeyRecord(len(data), InternalKeyKindIngestSST)
copy(b.deferredOp.Key, data)
var buf [binary.MaxVarintLen64]byte
length := binary.PutUvarint(buf[:], uint64(fileNum))
b.prepareDeferredKeyRecord(length, InternalKeyKindIngestSST)
copy(b.deferredOp.Key, buf[:length])
// Since IngestSST writes only to the WAL and does not affect the memtable,
// we restore b.memTableSize to its original value. Note that Batch.count
// is not reset because for the InternalKeyKindIngestSST the count is the
Expand Down
21 changes: 16 additions & 5 deletions batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,22 @@ func TestBatch(t *testing.T) {
}
}

encodeFileNum := func(n base.FileNum) string {
return string(binary.AppendUvarint(nil, uint64(n)))
}
decodeFileNum := func(d []byte) base.FileNum {
val, n := binary.Uvarint(d)
if n <= 0 {
t.Fatalf("invalid filenum encoding")
}
return base.FileNum(val)
}

// RangeKeySet and RangeKeyUnset are untested here because they don't expose
// deferred variants. This is a consequence of these keys' more complex
// value encodings.
testCases := []testCase{
{InternalKeyKindIngestSST, "1.sst", ""},
{InternalKeyKindIngestSST, encodeFileNum(1), ""},
{InternalKeyKindSet, "roses", "red"},
{InternalKeyKindSet, "violets", "blue"},
{InternalKeyKindDelete, "roses", ""},
Expand Down Expand Up @@ -99,7 +110,7 @@ func TestBatch(t *testing.T) {
case InternalKeyKindRangeKeyDelete:
_ = b.RangeKeyDelete([]byte(tc.key), []byte(tc.value), nil)
case InternalKeyKindIngestSST:
b.ingestSST([]byte(tc.key))
b.ingestSST(decodeFileNum([]byte(tc.key)))
}
}
verifyTestCases(&b, testCases)
Expand Down Expand Up @@ -139,7 +150,7 @@ func TestBatch(t *testing.T) {
case InternalKeyKindLogData:
_ = b.LogData([]byte(tc.key), nil)
case InternalKeyKindIngestSST:
b.ingestSST([]byte(tc.key))
b.ingestSST(decodeFileNum([]byte(tc.key)))
case InternalKeyKindRangeKeyDelete:
d := b.RangeKeyDeleteDeferred(len(key), len(value))
copy(d.Key, key)
Expand All @@ -154,9 +165,9 @@ func TestBatchIngestSST(t *testing.T) {
// Verify that Batch.IngestSST has the correct batch count and memtable
// size.
var b Batch
b.ingestSST([]byte("1.sst"))
b.ingestSST(1)
require.Equal(t, int(b.Count()), 1)
b.ingestSST([]byte("2.sst"))
b.ingestSST(2)
require.Equal(t, int(b.Count()), 2)
require.Equal(t, int(b.memTableSize), 0)
require.Equal(t, b.ingestedSSTBatch, true)
Expand Down
2 changes: 1 addition & 1 deletion flushable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func TestIngestedSSTFlushableAPI(t *testing.T) {
// (e.g. because the files reside on a different filesystem), ingestLink will
// fall back to copying, and if that fails we undo our work and return an
// error.
if _, err := ingestLink(jobID, d.opts, d.objProvider, paths, meta); err != nil {
if err := ingestLink(jobID, d.opts, d.objProvider, paths, meta); err != nil {
panic("couldn't hard link sstables")
}

Expand Down
24 changes: 11 additions & 13 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,31 +263,30 @@ func ingestCleanup(objProvider *objstorage.Provider, meta []*fileMetadata) error
return firstErr
}

// ingestLink creates new objects which are backed by either hardlinks to or
// copies of the ingested files.
func ingestLink(
jobID int, opts *Options, objProvider *objstorage.Provider, paths []string, meta []*fileMetadata,
) ([]string, error) {
newPaths := make([]string, len(paths))
) error {
for i := range paths {
target := objProvider.Path(fileTypeTable, meta[i].FileNum)
newPaths[i] = target
err := objProvider.LinkOrCopyFromLocal(opts.FS, paths[i], fileTypeTable, meta[i].FileNum)
if err != nil {
if err2 := ingestCleanup(objProvider, meta[:i]); err2 != nil {
opts.Logger.Infof("ingest cleanup failed: %v", err2)
}
return nil, err
return err
}
if opts.EventListener.TableCreated != nil {
opts.EventListener.TableCreated(TableCreateInfo{
JobID: jobID,
Reason: "ingesting",
Path: target,
Path: objProvider.Path(fileTypeTable, meta[i].FileNum),
FileNum: meta[i].FileNum,
})
}
}

return newPaths, nil
return nil
}

func ingestMemtableOverlaps(cmp Compare, mem flushable, meta []*fileMetadata) bool {
Expand Down Expand Up @@ -722,10 +721,10 @@ func (d *DB) newIngestedFlushableEntry(
// we're holding both locks, the order in which we rotate the memtable or
// recycle the WAL in this function is irrelevant as long as the correct log
// numbers are assigned to the appropriate flushable.
func (d *DB) handleIngestAsFlushable(paths []string, meta []*fileMetadata, seqNum uint64) error {
func (d *DB) handleIngestAsFlushable(meta []*fileMetadata, seqNum uint64) error {
b := d.NewBatch()
for _, path := range paths {
b.ingestSST([]byte(path))
for _, m := range meta {
b.ingestSST(m.FileNum)
}
b.setSeqNum(seqNum)

Expand Down Expand Up @@ -819,8 +818,7 @@ func (d *DB) ingest(
// (e.g. because the files reside on a different filesystem), ingestLink will
// fall back to copying, and if that fails we undo our work and return an
// error.
newPaths, err := ingestLink(jobID, d.opts, d.objProvider, paths, meta)
if err != nil {
if err := ingestLink(jobID, d.opts, d.objProvider, paths, meta); err != nil {
return IngestOperationStats{}, err
}
// Fsync the directory we added the tables to. We need to do this at some
Expand Down Expand Up @@ -861,7 +859,7 @@ func (d *DB) ingest(
// The ingestion overlaps with the memtable. Since there aren't
// too many memtables already queued up, we can slide the
// ingested sstables on top of the existing memtables.
err = d.handleIngestAsFlushable(newPaths, meta, seqNum)
err = d.handleIngestAsFlushable(meta, seqNum)
asFlushable = true
return
}
Expand Down
4 changes: 2 additions & 2 deletions ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func TestIngestLink(t *testing.T) {
opts.FS.Remove(paths[i])
}

_, err := ingestLink(0 /* jobID */, opts, objProvider, paths, meta)
err := ingestLink(0 /* jobID */, opts, objProvider, paths, meta)
if i < count {
if err == nil {
t.Fatalf("expected error, but found success")
Expand Down Expand Up @@ -375,7 +375,7 @@ func TestIngestLinkFallback(t *testing.T) {
objProvider := objstorage.New(objstorage.DefaultSettings(opts.FS, ""))

meta := []*fileMetadata{{FileNum: 1}}
_, err = ingestLink(0, opts, objProvider, []string{"source"}, meta)
err = ingestLink(0, opts, objProvider, []string{"source"}, meta)
require.NoError(t, err)

dest, err := mem.Open("000001.sst")
Expand Down
37 changes: 16 additions & 21 deletions open.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,12 @@ package pebble

import (
"bytes"
"encoding/binary"
"fmt"
"io"
"math"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"sync/atomic"
"time"

Expand Down Expand Up @@ -721,40 +719,37 @@ func (d *DB) replayWAL(

{
br := b.Reader()
if kind, path, _, _ := br.Next(); kind == InternalKeyKindIngestSST {
paths := make([]string, b.Count())
fileNums := make([]FileNum, b.Count())
addPath := func(path []byte, i int) {
paths[i] = string(path)
// TODO(bananabrick): Store the filenums in the batch as a
// value, so that we don't have to perform this custom
// parsing here.
fileNum, err := strconv.Atoi(
strings.TrimSuffix(filepath.Base(string(path)),
filepath.Ext(string(path))),
)
if err != nil {
panic("pebble: sstable file path is invalid.")
if kind, encodedFileNum, _, _ := br.Next(); kind == InternalKeyKindIngestSST {
fileNums := make([]FileNum, 0, b.Count())
addFileNum := func(encodedFileNum []byte) {
fileNum, n := binary.Uvarint(encodedFileNum)
if n <= 0 {
panic("pebble: ingest sstable file num is invalid.")
}
fileNums[i] = FileNum(fileNum)
fileNums = append(fileNums, base.FileNum(fileNum))
}
addPath(path, 0)
addFileNum(encodedFileNum)

for i := 1; i < int(b.Count()); i++ {
kind, path, _, ok := br.Next()
kind, encodedFileNum, _, ok := br.Next()
if kind != InternalKeyKindIngestSST {
panic("pebble: invalid batch key kind.")
}
if !ok {
panic("pebble: invalid batch count.")
}
addPath(path, i)
addFileNum(encodedFileNum)
}

if _, _, _, ok := br.Next(); ok {
panic("pebble: invalid number of entries in batch.")
}

paths := make([]string, len(fileNums))
for i, n := range fileNums {
paths[i] = base.MakeFilepath(d.opts.FS, d.dirname, fileTypeTable, n)
}

var meta []*manifest.FileMetadata
meta, _, err = ingestLoad(
d.opts, d.mu.formatVers.vers, paths, d.cacheID, fileNums,
Expand Down

0 comments on commit 7d1e4ba

Please sign in to comment.