diff --git a/batch.go b/batch.go index 3f02a09bfe..5fe9052df0 100644 --- a/batch.go +++ b/batch.go @@ -828,9 +828,9 @@ func (b *Batch) LogData(data []byte, _ *WriteOptions) error { return nil } -// IngestSST adds an sstable path to the batch. The data will only be written to -// the WAL (not added to memtables or sstables). -func (b *Batch) ingestSST(data []byte) { +// IngestSST adds the FileNum for an sstable to the batch. The data will only be +// written to the WAL (not added to memtables or sstables). +func (b *Batch) ingestSST(fileNum base.FileNum) { if b.Empty() { b.ingestedSSTBatch = true } else if !b.ingestedSSTBatch { @@ -839,8 +839,10 @@ func (b *Batch) ingestSST(data []byte) { } origMemTableSize := b.memTableSize - b.prepareDeferredKeyRecord(len(data), InternalKeyKindIngestSST) - copy(b.deferredOp.Key, data) + var buf [binary.MaxVarintLen64]byte + length := binary.PutUvarint(buf[:], uint64(fileNum)) + b.prepareDeferredKeyRecord(length, InternalKeyKindIngestSST) + copy(b.deferredOp.Key, buf[:length]) // Since IngestSST writes only to the WAL and does not affect the memtable, // we restore b.memTableSize to its original value. Note that Batch.count // is not reset because for the InternalKeyKindIngestSST the count is the diff --git a/batch_test.go b/batch_test.go index b7bdc5b5a6..f2fd74e83c 100644 --- a/batch_test.go +++ b/batch_test.go @@ -51,11 +51,22 @@ func TestBatch(t *testing.T) { } } + encodeFileNum := func(n base.FileNum) string { + return string(binary.AppendUvarint(nil, uint64(n))) + } + decodeFileNum := func(d []byte) base.FileNum { + val, n := binary.Uvarint(d) + if n <= 0 { + t.Fatalf("invalid filenum encoding") + } + return base.FileNum(val) + } + // RangeKeySet and RangeKeyUnset are untested here because they don't expose // deferred variants. This is a consequence of these keys' more complex // value encodings. testCases := []testCase{ - {InternalKeyKindIngestSST, "1.sst", ""}, + {InternalKeyKindIngestSST, encodeFileNum(1), ""}, {InternalKeyKindSet, "roses", "red"}, {InternalKeyKindSet, "violets", "blue"}, {InternalKeyKindDelete, "roses", ""}, @@ -99,7 +110,7 @@ func TestBatch(t *testing.T) { case InternalKeyKindRangeKeyDelete: _ = b.RangeKeyDelete([]byte(tc.key), []byte(tc.value), nil) case InternalKeyKindIngestSST: - b.ingestSST([]byte(tc.key)) + b.ingestSST(decodeFileNum([]byte(tc.key))) } } verifyTestCases(&b, testCases) @@ -139,7 +150,7 @@ func TestBatch(t *testing.T) { case InternalKeyKindLogData: _ = b.LogData([]byte(tc.key), nil) case InternalKeyKindIngestSST: - b.ingestSST([]byte(tc.key)) + b.ingestSST(decodeFileNum([]byte(tc.key))) case InternalKeyKindRangeKeyDelete: d := b.RangeKeyDeleteDeferred(len(key), len(value)) copy(d.Key, key) @@ -154,9 +165,9 @@ func TestBatchIngestSST(t *testing.T) { // Verify that Batch.IngestSST has the correct batch count and memtable // size. var b Batch - b.ingestSST([]byte("1.sst")) + b.ingestSST(1) require.Equal(t, int(b.Count()), 1) - b.ingestSST([]byte("2.sst")) + b.ingestSST(2) require.Equal(t, int(b.Count()), 2) require.Equal(t, int(b.memTableSize), 0) require.Equal(t, b.ingestedSSTBatch, true) diff --git a/flushable_test.go b/flushable_test.go index 86510fb379..443ea9fce2 100644 --- a/flushable_test.go +++ b/flushable_test.go @@ -78,7 +78,7 @@ func TestIngestedSSTFlushableAPI(t *testing.T) { // (e.g. because the files reside on a different filesystem), ingestLink will // fall back to copying, and if that fails we undo our work and return an // error. - if _, err := ingestLink(jobID, d.opts, d.objProvider, paths, meta); err != nil { + if err := ingestLink(jobID, d.opts, d.objProvider, paths, meta); err != nil { panic("couldn't hard link sstables") } diff --git a/ingest.go b/ingest.go index 5424f39d7c..e67b711260 100644 --- a/ingest.go +++ b/ingest.go @@ -263,31 +263,30 @@ func ingestCleanup(objProvider *objstorage.Provider, meta []*fileMetadata) error return firstErr } +// ingestLink creates new objects which are backed by either hardlinks to or +// copies of the ingested files. func ingestLink( jobID int, opts *Options, objProvider *objstorage.Provider, paths []string, meta []*fileMetadata, -) ([]string, error) { - newPaths := make([]string, len(paths)) +) error { for i := range paths { - target := objProvider.Path(fileTypeTable, meta[i].FileNum) - newPaths[i] = target err := objProvider.LinkOrCopyFromLocal(opts.FS, paths[i], fileTypeTable, meta[i].FileNum) if err != nil { if err2 := ingestCleanup(objProvider, meta[:i]); err2 != nil { opts.Logger.Infof("ingest cleanup failed: %v", err2) } - return nil, err + return err } if opts.EventListener.TableCreated != nil { opts.EventListener.TableCreated(TableCreateInfo{ JobID: jobID, Reason: "ingesting", - Path: target, + Path: objProvider.Path(fileTypeTable, meta[i].FileNum), FileNum: meta[i].FileNum, }) } } - return newPaths, nil + return nil } func ingestMemtableOverlaps(cmp Compare, mem flushable, meta []*fileMetadata) bool { @@ -722,10 +721,10 @@ func (d *DB) newIngestedFlushableEntry( // we're holding both locks, the order in which we rotate the memtable or // recycle the WAL in this function is irrelevant as long as the correct log // numbers are assigned to the appropriate flushable. -func (d *DB) handleIngestAsFlushable(paths []string, meta []*fileMetadata, seqNum uint64) error { +func (d *DB) handleIngestAsFlushable(meta []*fileMetadata, seqNum uint64) error { b := d.NewBatch() - for _, path := range paths { - b.ingestSST([]byte(path)) + for _, m := range meta { + b.ingestSST(m.FileNum) } b.setSeqNum(seqNum) @@ -819,8 +818,7 @@ func (d *DB) ingest( // (e.g. because the files reside on a different filesystem), ingestLink will // fall back to copying, and if that fails we undo our work and return an // error. - newPaths, err := ingestLink(jobID, d.opts, d.objProvider, paths, meta) - if err != nil { + if err := ingestLink(jobID, d.opts, d.objProvider, paths, meta); err != nil { return IngestOperationStats{}, err } // Fsync the directory we added the tables to. We need to do this at some @@ -861,7 +859,7 @@ func (d *DB) ingest( // The ingestion overlaps with the memtable. Since there aren't // too many memtables already queued up, we can slide the // ingested sstables on top of the existing memtables. - err = d.handleIngestAsFlushable(newPaths, meta, seqNum) + err = d.handleIngestAsFlushable(meta, seqNum) asFlushable = true return } diff --git a/ingest_test.go b/ingest_test.go index f89a1ff563..9af2962854 100644 --- a/ingest_test.go +++ b/ingest_test.go @@ -314,7 +314,7 @@ func TestIngestLink(t *testing.T) { opts.FS.Remove(paths[i]) } - _, err := ingestLink(0 /* jobID */, opts, objProvider, paths, meta) + err := ingestLink(0 /* jobID */, opts, objProvider, paths, meta) if i < count { if err == nil { t.Fatalf("expected error, but found success") @@ -375,7 +375,7 @@ func TestIngestLinkFallback(t *testing.T) { objProvider := objstorage.New(objstorage.DefaultSettings(opts.FS, "")) meta := []*fileMetadata{{FileNum: 1}} - _, err = ingestLink(0, opts, objProvider, []string{"source"}, meta) + err = ingestLink(0, opts, objProvider, []string{"source"}, meta) require.NoError(t, err) dest, err := mem.Open("000001.sst") diff --git a/open.go b/open.go index a00b57a04e..70f2eb23de 100644 --- a/open.go +++ b/open.go @@ -6,14 +6,12 @@ package pebble import ( "bytes" + "encoding/binary" "fmt" "io" "math" "os" - "path/filepath" "sort" - "strconv" - "strings" "sync/atomic" "time" @@ -721,40 +719,37 @@ func (d *DB) replayWAL( { br := b.Reader() - if kind, path, _, _ := br.Next(); kind == InternalKeyKindIngestSST { - paths := make([]string, b.Count()) - fileNums := make([]FileNum, b.Count()) - addPath := func(path []byte, i int) { - paths[i] = string(path) - // TODO(bananabrick): Store the filenums in the batch as a - // value, so that we don't have to perform this custom - // parsing here. - fileNum, err := strconv.Atoi( - strings.TrimSuffix(filepath.Base(string(path)), - filepath.Ext(string(path))), - ) - if err != nil { - panic("pebble: sstable file path is invalid.") + if kind, encodedFileNum, _, _ := br.Next(); kind == InternalKeyKindIngestSST { + fileNums := make([]FileNum, 0, b.Count()) + addFileNum := func(encodedFileNum []byte) { + fileNum, n := binary.Uvarint(encodedFileNum) + if n <= 0 { + panic("pebble: ingest sstable file num is invalid.") } - fileNums[i] = FileNum(fileNum) + fileNums = append(fileNums, base.FileNum(fileNum)) } - addPath(path, 0) + addFileNum(encodedFileNum) for i := 1; i < int(b.Count()); i++ { - kind, path, _, ok := br.Next() + kind, encodedFileNum, _, ok := br.Next() if kind != InternalKeyKindIngestSST { panic("pebble: invalid batch key kind.") } if !ok { panic("pebble: invalid batch count.") } - addPath(path, i) + addFileNum(encodedFileNum) } if _, _, _, ok := br.Next(); ok { panic("pebble: invalid number of entries in batch.") } + paths := make([]string, len(fileNums)) + for i, n := range fileNums { + paths[i] = base.MakeFilepath(d.opts.FS, d.dirname, fileTypeTable, n) + } + var meta []*manifest.FileMetadata meta, _, err = ingestLoad( d.opts, d.mu.formatVers.vers, paths, d.cacheID, fileNums,