Skip to content

Commit

Permalink
sstable: introduce objstorage interface
Browse files Browse the repository at this point in the history
This change introduces `objstorage.Provider` which replaces uses of
`vfs.FS` / `vfs.File` in sstable code.

We pull out the write-buffering and read-ahead logic into the Provider
implementation. This simplifies the sstable reader code considerably.

The provider will be extended to support shared storage, along the
following lines:
 - define a new `SharedObjectStore` interface
 - the Provider stores a `SharedObjectStore` instance
 - the Provider maintains internally a mapping from `FileNum` to
   backend type (local vs shared). Methods like `OpenForReading` use
   the mapping to talk to the correct backend.
 - `Create` needs to be augmented with extra information (e.g. LSM
   level?) to decide where a new object is created
  • Loading branch information
RaduBerinde committed Jan 23, 2023
1 parent b418e86 commit 08787ac
Show file tree
Hide file tree
Showing 36 changed files with 1,022 additions and 544 deletions.
51 changes: 28 additions & 23 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/pebble/internal/private"
"github.com/cockroachdb/pebble/internal/rangedel"
"github.com/cockroachdb/pebble/internal/rangekey"
"github.com/cockroachdb/pebble/objstorage"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/vfs"
)
Expand Down Expand Up @@ -267,19 +268,19 @@ func (u *userKeyChangeSplitter) onNewOutput(key *InternalKey) []byte {
return u.splitter.onNewOutput(key)
}

// compactionFile is a vfs.File wrapper that, on every write, updates a metric
// in `versions` on bytes written by in-progress compactions so far. It also
// increments a per-compaction `written` int.
type compactionFile struct {
vfs.File
// compactionWritable is a objstorage.Writable wrapper that, on every write,
// updates a metric in `versions` on bytes written by in-progress compactions so
// far. It also increments a per-compaction `written` int.
type compactionWritable struct {
objstorage.Writable

versions *versionSet
written *int64
}

// Write implements the io.Writer interface.
func (c *compactionFile) Write(p []byte) (n int, err error) {
n, err = c.File.Write(p)
// Write is part of the objstorage.Writable interface.
func (c *compactionWritable) Write(p []byte) (n int, err error) {
n, err = c.Writable.Write(p)
if err != nil {
return n, err
}
Expand Down Expand Up @@ -2264,6 +2265,14 @@ func (d *DB) runCompaction(
}
}()

// TODO(radu): this should be created once, at a higher level.
provider := objstorage.New(objstorage.Settings{
FS: d.opts.FS,
FSDirName: d.dirname,
NoSyncOnClose: d.opts.NoSyncOnClose,
BytesPerSync: d.opts.BytesPerSync,
})

// Check for a delete-only compaction. This can occur when wide range
// tombstones completely contain sstables.
if c.kind == compactionKindDeleteOnly {
Expand Down Expand Up @@ -2341,8 +2350,8 @@ func (d *DB) runCompaction(
c.elideRangeTombstone, d.FormatMajorVersion())

var (
filenames []string
tw *sstable.Writer
createdFiles []base.FileNum
tw *sstable.Writer
)
defer func() {
if iter != nil {
Expand All @@ -2352,8 +2361,8 @@ func (d *DB) runCompaction(
retErr = firstError(retErr, tw.Close())
}
if retErr != nil {
for _, filename := range filenames {
d.opts.FS.Remove(filename)
for _, fileNum := range createdFiles {
_ = provider.Remove(fileTypeTable, fileNum)
}
}
for _, closer := range c.closers {
Expand Down Expand Up @@ -2428,31 +2437,27 @@ func (d *DB) runCompaction(
pendingOutputs = append(pendingOutputs, fileMeta)
d.mu.Unlock()

filename := base.MakeFilepath(d.opts.FS, d.dirname, fileTypeTable, fileNum)
file, err := d.opts.FS.Create(filename)
writable, err := provider.Create(fileTypeTable, fileNum)
if err != nil {
return err
}

reason := "flushing"
if c.flushing == nil {
reason = "compacting"
}
d.opts.EventListener.TableCreated(TableCreateInfo{
JobID: jobID,
Reason: reason,
Path: filename,
Path: provider.Path(fileTypeTable, fileNum),
FileNum: fileNum,
})
file = vfs.NewSyncingFile(file, vfs.SyncingFileOptions{
NoSyncOnClose: d.opts.NoSyncOnClose,
BytesPerSync: d.opts.BytesPerSync,
})
file = &compactionFile{
File: file,
writable = &compactionWritable{
Writable: writable,
versions: d.mu.versions,
written: &c.bytesWritten,
}
filenames = append(filenames, filename)
createdFiles = append(createdFiles, fileNum)
cacheOpts := private.SSTableCacheOpts(d.cacheID, fileNum).(sstable.WriterOption)
internalTableOpt := private.SSTableInternalTableOpt.(sstable.WriterOption)

Expand All @@ -2464,7 +2469,7 @@ func (d *DB) runCompaction(
d.opts.Experimental.MaxWriterConcurrency > 0 &&
(cpuWorkHandle.Permitted() || d.opts.Experimental.ForceWriterParallelism)

tw = sstable.NewWriter(file, writerOpts, cacheOpts, internalTableOpt, &prevPointKey)
tw = sstable.NewWriter(writable, writerOpts, cacheOpts, internalTableOpt, &prevPointKey)

fileMeta.CreationTime = time.Now().Unix()
ve.NewFiles = append(ve.NewFiles, newFileEntry{
Expand Down
4 changes: 3 additions & 1 deletion compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/pebble/internal/errorfs"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/objstorage"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/vfs"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -994,10 +995,11 @@ func TestCompaction(t *testing.T) {
}
ss := []string(nil)
v := d.mu.versions.currentVersion()
provider := objstorage.New(objstorage.DefaultSettings(mem, "" /* dirName */))
for _, levelMetadata := range v.Levels {
iter := levelMetadata.Iter()
for meta := iter.First(); meta != nil; meta = iter.Next() {
f, err := mem.Open(base.MakeFilepath(mem, "", fileTypeTable, meta.FileNum))
f, err := provider.OpenForReading(base.FileTypeTable, meta.FileNum)
if err != nil {
return "", "", errors.WithStack(err)
}
Expand Down
6 changes: 5 additions & 1 deletion external_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,11 @@ func openExternalTables(
) (readers []*sstable.Reader, err error) {
readers = make([]*sstable.Reader, 0, len(files))
for i := range files {
r, err := sstable.NewReader(files[i], readerOpts, extraReaderOpts...)
readable, err := sstable.NewSimpleReadable(files[i])
if err != nil {
return readers, err
}
r, err := sstable.NewReader(readable, readerOpts, extraReaderOpts...)
if err != nil {
return readers, err
}
Expand Down
5 changes: 4 additions & 1 deletion external_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,10 @@ func TestIterRandomizedMaybeFilteredKeys(t *testing.T) {
f1, err := mem.Open(filename)
require.NoError(t, err)

r, err := sstable.NewReader(f1, sstable.ReaderOptions{
readable, err := sstable.NewSimpleReadable(f1)
require.NoError(t, err)

r, err := sstable.NewReader(readable, sstable.ReaderOptions{
Cache: c,
Comparer: testkeys.Comparer,
})
Expand Down
8 changes: 4 additions & 4 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,18 @@ func ingestValidateKey(opts *Options, key *InternalKey) error {
func ingestLoad1(
opts *Options, fmv FormatMajorVersion, path string, cacheID uint64, fileNum FileNum,
) (*fileMetadata, error) {
stat, err := opts.FS.Stat(path)
f, err := opts.FS.Open(path)
if err != nil {
return nil, err
}

f, err := opts.FS.Open(path)
readable, err := sstable.NewSimpleReadable(f)
if err != nil {
return nil, err
}

cacheOpts := private.SSTableCacheOpts(cacheID, fileNum).(sstable.ReaderOption)
r, err := sstable.NewReader(f, opts.MakeReaderOptions(), cacheOpts)
r, err := sstable.NewReader(readable, opts.MakeReaderOptions(), cacheOpts)
if err != nil {
return nil, err
}
Expand All @@ -78,7 +78,7 @@ func ingestLoad1(

meta := &fileMetadata{}
meta.FileNum = fileNum
meta.Size = uint64(stat.Size())
meta.Size = uint64(readable.Size())
meta.CreationTime = time.Now().Unix()

// Avoid loading into the table cache for collecting stats if we
Expand Down
4 changes: 3 additions & 1 deletion ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1622,9 +1622,11 @@ func TestIngestValidation(t *testing.T) {
defer func() { require.NoError(t, d.Close()) }()

corrupt := func(f vfs.File) {
readable, err := sstable.NewSimpleReadable(f)
require.NoError(t, err)
// Compute the layout of the sstable in order to find the
// appropriate block locations to corrupt.
r, err := sstable.NewReader(f, sstable.ReaderOptions{})
r, err := sstable.NewReader(readable, sstable.ReaderOptions{})
require.NoError(t, err)
l, err := r.Layout()
require.NoError(t, err)
Expand Down
6 changes: 5 additions & 1 deletion level_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,12 @@ func TestCheckLevelsCornerCases(t *testing.T) {
if err != nil {
return err.Error()
}
readable, err := sstable.NewSimpleReadable(f)
if err != nil {
return err.Error()
}
cacheOpts := private.SSTableCacheOpts(0, fileNum-1).(sstable.ReaderOption)
r, err := sstable.NewReader(f, sstable.ReaderOptions{}, cacheOpts)
r, err := sstable.NewReader(readable, sstable.ReaderOptions{}, cacheOpts)
if err != nil {
return err.Error()
}
Expand Down
12 changes: 10 additions & 2 deletions level_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,11 @@ func (lt *levelIterTest) runBuild(d *datadriven.TestData) string {
if err != nil {
return err.Error()
}
r, err := sstable.NewReader(f1, sstable.ReaderOptions{
readable, err := sstable.NewSimpleReadable(f1)
if err != nil {
return err.Error()
}
r, err := sstable.NewReader(readable, sstable.ReaderOptions{
Filters: map[string]FilterPolicy{
fp.Name(): fp,
},
Expand Down Expand Up @@ -479,7 +483,11 @@ func buildLevelIterTables(
if err != nil {
b.Fatal(err)
}
readers[i], err = sstable.NewReader(f, opts)
readable, err := sstable.NewSimpleReadable(f)
if err != nil {
b.Fatal(err)
}
readers[i], err = sstable.NewReader(readable, opts)
if err != nil {
b.Fatal(err)
}
Expand Down
18 changes: 15 additions & 3 deletions merging_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,11 @@ func TestMergingIterCornerCases(t *testing.T) {
if err != nil {
return err.Error()
}
r, err := sstable.NewReader(f, sstable.ReaderOptions{})
readable, err := sstable.NewSimpleReadable(f)
if err != nil {
return err.Error()
}
r, err := sstable.NewReader(readable, sstable.ReaderOptions{})
if err != nil {
return err.Error()
}
Expand Down Expand Up @@ -327,7 +331,11 @@ func buildMergingIterTables(
if err != nil {
b.Fatal(err)
}
readers[i], err = sstable.NewReader(f, opts)
readable, err := sstable.NewSimpleReadable(f)
if err != nil {
b.Fatal(err)
}
readers[i], err = sstable.NewReader(readable, opts)
if err != nil {
b.Fatal(err)
}
Expand Down Expand Up @@ -563,7 +571,11 @@ func buildLevelsForMergingIterSeqSeek(
if err != nil {
b.Fatal(err)
}
r, err := sstable.NewReader(f, opts)
readable, err := sstable.NewSimpleReadable(f)
if err != nil {
b.Fatal(err)
}
r, err := sstable.NewReader(readable, opts)
if err != nil {
b.Fatal(err)
}
Expand Down
29 changes: 29 additions & 0 deletions objstorage/noop_readahead.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.

package objstorage

import "io"

// NoopReadaheadHandle can be used by Readable implementations that don't
// support read-ahead.
type NoopReadaheadHandle struct {
io.ReaderAt
}

// MakeNoopReadaheadHandle initializes a NoopReadaheadHandle.
func MakeNoopReadaheadHandle(r io.ReaderAt) NoopReadaheadHandle {
return NoopReadaheadHandle{ReaderAt: r}
}

var _ ReadaheadHandle = (*NoopReadaheadHandle)(nil)

// Close is part of the ReadaheadHandle interface.
func (*NoopReadaheadHandle) Close() error { return nil }

// MaxReadahead is part of the ReadaheadHandle interface.
func (*NoopReadaheadHandle) MaxReadahead() {}

// RecordCacheHit is part of the ReadaheadHandle interface.
func (*NoopReadaheadHandle) RecordCacheHit(offset, size int64) {}
Loading

0 comments on commit 08787ac

Please sign in to comment.