From 7d1e4ba7ffd05ea2be4cd9e67d04493ee01d66ee Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Thu, 2 Feb 2023 12:51:34 -0800 Subject: [PATCH] db: change flushable ingest key to encode just the FileNum The `InternalKeyKindIngestSST` key contains the file path of an SST object. We don't normally store paths in metadata, as it is fairly fragile (e.g. what if the store is moved). Since this is always an SST object that we created (usually through hard-link), we can store just the FileNum. This also opens up the door for this object to be potentially stored on shared storage (we'd need to support non-local SSTs as ingest inputs though). --- batch.go | 12 +++++++----- batch_test.go | 21 ++++++++++++++++----- flushable_test.go | 2 +- ingest.go | 24 +++++++++++------------- ingest_test.go | 4 ++-- open.go | 37 ++++++++++++++++--------------------- 6 files changed, 53 insertions(+), 47 deletions(-) diff --git a/batch.go b/batch.go index 3f02a09bfe..5fe9052df0 100644 --- a/batch.go +++ b/batch.go @@ -828,9 +828,9 @@ func (b *Batch) LogData(data []byte, _ *WriteOptions) error { return nil } -// IngestSST adds an sstable path to the batch. The data will only be written to -// the WAL (not added to memtables or sstables). -func (b *Batch) ingestSST(data []byte) { +// IngestSST adds the FileNum for an sstable to the batch. The data will only be +// written to the WAL (not added to memtables or sstables). +func (b *Batch) ingestSST(fileNum base.FileNum) { if b.Empty() { b.ingestedSSTBatch = true } else if !b.ingestedSSTBatch { @@ -839,8 +839,10 @@ func (b *Batch) ingestSST(data []byte) { } origMemTableSize := b.memTableSize - b.prepareDeferredKeyRecord(len(data), InternalKeyKindIngestSST) - copy(b.deferredOp.Key, data) + var buf [binary.MaxVarintLen64]byte + length := binary.PutUvarint(buf[:], uint64(fileNum)) + b.prepareDeferredKeyRecord(length, InternalKeyKindIngestSST) + copy(b.deferredOp.Key, buf[:length]) // Since IngestSST writes only to the WAL and does not affect the memtable, // we restore b.memTableSize to its original value. Note that Batch.count // is not reset because for the InternalKeyKindIngestSST the count is the diff --git a/batch_test.go b/batch_test.go index b7bdc5b5a6..f2fd74e83c 100644 --- a/batch_test.go +++ b/batch_test.go @@ -51,11 +51,22 @@ func TestBatch(t *testing.T) { } } + encodeFileNum := func(n base.FileNum) string { + return string(binary.AppendUvarint(nil, uint64(n))) + } + decodeFileNum := func(d []byte) base.FileNum { + val, n := binary.Uvarint(d) + if n <= 0 { + t.Fatalf("invalid filenum encoding") + } + return base.FileNum(val) + } + // RangeKeySet and RangeKeyUnset are untested here because they don't expose // deferred variants. This is a consequence of these keys' more complex // value encodings. testCases := []testCase{ - {InternalKeyKindIngestSST, "1.sst", ""}, + {InternalKeyKindIngestSST, encodeFileNum(1), ""}, {InternalKeyKindSet, "roses", "red"}, {InternalKeyKindSet, "violets", "blue"}, {InternalKeyKindDelete, "roses", ""}, @@ -99,7 +110,7 @@ func TestBatch(t *testing.T) { case InternalKeyKindRangeKeyDelete: _ = b.RangeKeyDelete([]byte(tc.key), []byte(tc.value), nil) case InternalKeyKindIngestSST: - b.ingestSST([]byte(tc.key)) + b.ingestSST(decodeFileNum([]byte(tc.key))) } } verifyTestCases(&b, testCases) @@ -139,7 +150,7 @@ func TestBatch(t *testing.T) { case InternalKeyKindLogData: _ = b.LogData([]byte(tc.key), nil) case InternalKeyKindIngestSST: - b.ingestSST([]byte(tc.key)) + b.ingestSST(decodeFileNum([]byte(tc.key))) case InternalKeyKindRangeKeyDelete: d := b.RangeKeyDeleteDeferred(len(key), len(value)) copy(d.Key, key) @@ -154,9 +165,9 @@ func TestBatchIngestSST(t *testing.T) { // Verify that Batch.IngestSST has the correct batch count and memtable // size. var b Batch - b.ingestSST([]byte("1.sst")) + b.ingestSST(1) require.Equal(t, int(b.Count()), 1) - b.ingestSST([]byte("2.sst")) + b.ingestSST(2) require.Equal(t, int(b.Count()), 2) require.Equal(t, int(b.memTableSize), 0) require.Equal(t, b.ingestedSSTBatch, true) diff --git a/flushable_test.go b/flushable_test.go index 86510fb379..443ea9fce2 100644 --- a/flushable_test.go +++ b/flushable_test.go @@ -78,7 +78,7 @@ func TestIngestedSSTFlushableAPI(t *testing.T) { // (e.g. because the files reside on a different filesystem), ingestLink will // fall back to copying, and if that fails we undo our work and return an // error. - if _, err := ingestLink(jobID, d.opts, d.objProvider, paths, meta); err != nil { + if err := ingestLink(jobID, d.opts, d.objProvider, paths, meta); err != nil { panic("couldn't hard link sstables") } diff --git a/ingest.go b/ingest.go index 5424f39d7c..e67b711260 100644 --- a/ingest.go +++ b/ingest.go @@ -263,31 +263,30 @@ func ingestCleanup(objProvider *objstorage.Provider, meta []*fileMetadata) error return firstErr } +// ingestLink creates new objects which are backed by either hardlinks to or +// copies of the ingested files. func ingestLink( jobID int, opts *Options, objProvider *objstorage.Provider, paths []string, meta []*fileMetadata, -) ([]string, error) { - newPaths := make([]string, len(paths)) +) error { for i := range paths { - target := objProvider.Path(fileTypeTable, meta[i].FileNum) - newPaths[i] = target err := objProvider.LinkOrCopyFromLocal(opts.FS, paths[i], fileTypeTable, meta[i].FileNum) if err != nil { if err2 := ingestCleanup(objProvider, meta[:i]); err2 != nil { opts.Logger.Infof("ingest cleanup failed: %v", err2) } - return nil, err + return err } if opts.EventListener.TableCreated != nil { opts.EventListener.TableCreated(TableCreateInfo{ JobID: jobID, Reason: "ingesting", - Path: target, + Path: objProvider.Path(fileTypeTable, meta[i].FileNum), FileNum: meta[i].FileNum, }) } } - return newPaths, nil + return nil } func ingestMemtableOverlaps(cmp Compare, mem flushable, meta []*fileMetadata) bool { @@ -722,10 +721,10 @@ func (d *DB) newIngestedFlushableEntry( // we're holding both locks, the order in which we rotate the memtable or // recycle the WAL in this function is irrelevant as long as the correct log // numbers are assigned to the appropriate flushable. -func (d *DB) handleIngestAsFlushable(paths []string, meta []*fileMetadata, seqNum uint64) error { +func (d *DB) handleIngestAsFlushable(meta []*fileMetadata, seqNum uint64) error { b := d.NewBatch() - for _, path := range paths { - b.ingestSST([]byte(path)) + for _, m := range meta { + b.ingestSST(m.FileNum) } b.setSeqNum(seqNum) @@ -819,8 +818,7 @@ func (d *DB) ingest( // (e.g. because the files reside on a different filesystem), ingestLink will // fall back to copying, and if that fails we undo our work and return an // error. - newPaths, err := ingestLink(jobID, d.opts, d.objProvider, paths, meta) - if err != nil { + if err := ingestLink(jobID, d.opts, d.objProvider, paths, meta); err != nil { return IngestOperationStats{}, err } // Fsync the directory we added the tables to. We need to do this at some @@ -861,7 +859,7 @@ func (d *DB) ingest( // The ingestion overlaps with the memtable. Since there aren't // too many memtables already queued up, we can slide the // ingested sstables on top of the existing memtables. - err = d.handleIngestAsFlushable(newPaths, meta, seqNum) + err = d.handleIngestAsFlushable(meta, seqNum) asFlushable = true return } diff --git a/ingest_test.go b/ingest_test.go index f89a1ff563..9af2962854 100644 --- a/ingest_test.go +++ b/ingest_test.go @@ -314,7 +314,7 @@ func TestIngestLink(t *testing.T) { opts.FS.Remove(paths[i]) } - _, err := ingestLink(0 /* jobID */, opts, objProvider, paths, meta) + err := ingestLink(0 /* jobID */, opts, objProvider, paths, meta) if i < count { if err == nil { t.Fatalf("expected error, but found success") @@ -375,7 +375,7 @@ func TestIngestLinkFallback(t *testing.T) { objProvider := objstorage.New(objstorage.DefaultSettings(opts.FS, "")) meta := []*fileMetadata{{FileNum: 1}} - _, err = ingestLink(0, opts, objProvider, []string{"source"}, meta) + err = ingestLink(0, opts, objProvider, []string{"source"}, meta) require.NoError(t, err) dest, err := mem.Open("000001.sst") diff --git a/open.go b/open.go index a00b57a04e..70f2eb23de 100644 --- a/open.go +++ b/open.go @@ -6,14 +6,12 @@ package pebble import ( "bytes" + "encoding/binary" "fmt" "io" "math" "os" - "path/filepath" "sort" - "strconv" - "strings" "sync/atomic" "time" @@ -721,40 +719,37 @@ func (d *DB) replayWAL( { br := b.Reader() - if kind, path, _, _ := br.Next(); kind == InternalKeyKindIngestSST { - paths := make([]string, b.Count()) - fileNums := make([]FileNum, b.Count()) - addPath := func(path []byte, i int) { - paths[i] = string(path) - // TODO(bananabrick): Store the filenums in the batch as a - // value, so that we don't have to perform this custom - // parsing here. - fileNum, err := strconv.Atoi( - strings.TrimSuffix(filepath.Base(string(path)), - filepath.Ext(string(path))), - ) - if err != nil { - panic("pebble: sstable file path is invalid.") + if kind, encodedFileNum, _, _ := br.Next(); kind == InternalKeyKindIngestSST { + fileNums := make([]FileNum, 0, b.Count()) + addFileNum := func(encodedFileNum []byte) { + fileNum, n := binary.Uvarint(encodedFileNum) + if n <= 0 { + panic("pebble: ingest sstable file num is invalid.") } - fileNums[i] = FileNum(fileNum) + fileNums = append(fileNums, base.FileNum(fileNum)) } - addPath(path, 0) + addFileNum(encodedFileNum) for i := 1; i < int(b.Count()); i++ { - kind, path, _, ok := br.Next() + kind, encodedFileNum, _, ok := br.Next() if kind != InternalKeyKindIngestSST { panic("pebble: invalid batch key kind.") } if !ok { panic("pebble: invalid batch count.") } - addPath(path, i) + addFileNum(encodedFileNum) } if _, _, _, ok := br.Next(); ok { panic("pebble: invalid number of entries in batch.") } + paths := make([]string, len(fileNums)) + for i, n := range fileNums { + paths[i] = base.MakeFilepath(d.opts.FS, d.dirname, fileTypeTable, n) + } + var meta []*manifest.FileMetadata meta, _, err = ingestLoad( d.opts, d.mu.formatVers.vers, paths, d.cacheID, fileNums,