diff --git a/compaction.go b/compaction.go index 1b531b5727..6677ebd897 100644 --- a/compaction.go +++ b/compaction.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/pebble/internal/private" "github.com/cockroachdb/pebble/internal/rangedel" "github.com/cockroachdb/pebble/internal/rangekey" + "github.com/cockroachdb/pebble/objstorage" "github.com/cockroachdb/pebble/sstable" "github.com/cockroachdb/pebble/vfs" ) @@ -267,19 +268,19 @@ func (u *userKeyChangeSplitter) onNewOutput(key *InternalKey) []byte { return u.splitter.onNewOutput(key) } -// compactionFile is a vfs.File wrapper that, on every write, updates a metric -// in `versions` on bytes written by in-progress compactions so far. It also -// increments a per-compaction `written` int. -type compactionFile struct { - vfs.File +// compactionWritable is a objstorage.Writable wrapper that, on every write, +// updates a metric in `versions` on bytes written by in-progress compactions so +// far. It also increments a per-compaction `written` int. +type compactionWritable struct { + objstorage.Writable versions *versionSet written *int64 } -// Write implements the io.Writer interface. -func (c *compactionFile) Write(p []byte) (n int, err error) { - n, err = c.File.Write(p) +// Write is part of the objstorage.Writable interface. +func (c *compactionWritable) Write(p []byte) (n int, err error) { + n, err = c.Writable.Write(p) if err != nil { return n, err } @@ -2264,6 +2265,14 @@ func (d *DB) runCompaction( } }() + // TODO(radu): this should be created once, at a higher level. + provider := objstorage.New(objstorage.Settings{ + FS: d.opts.FS, + FSDirName: d.dirname, + NoSyncOnClose: d.opts.NoSyncOnClose, + BytesPerSync: d.opts.BytesPerSync, + }) + // Check for a delete-only compaction. This can occur when wide range // tombstones completely contain sstables. if c.kind == compactionKindDeleteOnly { @@ -2341,8 +2350,8 @@ func (d *DB) runCompaction( c.elideRangeTombstone, d.FormatMajorVersion()) var ( - filenames []string - tw *sstable.Writer + createdFiles []base.FileNum + tw *sstable.Writer ) defer func() { if iter != nil { @@ -2352,8 +2361,8 @@ func (d *DB) runCompaction( retErr = firstError(retErr, tw.Close()) } if retErr != nil { - for _, filename := range filenames { - d.opts.FS.Remove(filename) + for _, fileNum := range createdFiles { + _ = provider.Remove(fileTypeTable, fileNum) } } for _, closer := range c.closers { @@ -2428,11 +2437,11 @@ func (d *DB) runCompaction( pendingOutputs = append(pendingOutputs, fileMeta) d.mu.Unlock() - filename := base.MakeFilepath(d.opts.FS, d.dirname, fileTypeTable, fileNum) - file, err := d.opts.FS.Create(filename) + writable, err := provider.Create(fileTypeTable, fileNum) if err != nil { return err } + reason := "flushing" if c.flushing == nil { reason = "compacting" @@ -2440,19 +2449,15 @@ func (d *DB) runCompaction( d.opts.EventListener.TableCreated(TableCreateInfo{ JobID: jobID, Reason: reason, - Path: filename, + Path: provider.Path(fileTypeTable, fileNum), FileNum: fileNum, }) - file = vfs.NewSyncingFile(file, vfs.SyncingFileOptions{ - NoSyncOnClose: d.opts.NoSyncOnClose, - BytesPerSync: d.opts.BytesPerSync, - }) - file = &compactionFile{ - File: file, + writable = &compactionWritable{ + Writable: writable, versions: d.mu.versions, written: &c.bytesWritten, } - filenames = append(filenames, filename) + createdFiles = append(createdFiles, fileNum) cacheOpts := private.SSTableCacheOpts(d.cacheID, fileNum).(sstable.WriterOption) internalTableOpt := private.SSTableInternalTableOpt.(sstable.WriterOption) @@ -2464,7 +2469,7 @@ func (d *DB) runCompaction( d.opts.Experimental.MaxWriterConcurrency > 0 && (cpuWorkHandle.Permitted() || d.opts.Experimental.ForceWriterParallelism) - tw = sstable.NewWriter(file, writerOpts, cacheOpts, internalTableOpt, &prevPointKey) + tw = sstable.NewWriter(writable, writerOpts, cacheOpts, internalTableOpt, &prevPointKey) fileMeta.CreationTime = time.Now().Unix() ve.NewFiles = append(ve.NewFiles, newFileEntry{ diff --git a/compaction_test.go b/compaction_test.go index 9d6683943c..948bd18afb 100644 --- a/compaction_test.go +++ b/compaction_test.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/pebble/internal/errorfs" "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/manifest" + "github.com/cockroachdb/pebble/objstorage" "github.com/cockroachdb/pebble/sstable" "github.com/cockroachdb/pebble/vfs" "github.com/stretchr/testify/require" @@ -994,10 +995,11 @@ func TestCompaction(t *testing.T) { } ss := []string(nil) v := d.mu.versions.currentVersion() + provider := objstorage.New(objstorage.DefaultSettings(mem, "" /* dirName */)) for _, levelMetadata := range v.Levels { iter := levelMetadata.Iter() for meta := iter.First(); meta != nil; meta = iter.Next() { - f, err := mem.Open(base.MakeFilepath(mem, "", fileTypeTable, meta.FileNum)) + f, err := provider.OpenForReading(base.FileTypeTable, meta.FileNum) if err != nil { return "", "", errors.WithStack(err) } diff --git a/external_iterator.go b/external_iterator.go index 2586028584..449c9dbc61 100644 --- a/external_iterator.go +++ b/external_iterator.go @@ -318,7 +318,11 @@ func openExternalTables( ) (readers []*sstable.Reader, err error) { readers = make([]*sstable.Reader, 0, len(files)) for i := range files { - r, err := sstable.NewReader(files[i], readerOpts, extraReaderOpts...) + readable, err := sstable.NewSimpleReadable(files[i]) + if err != nil { + return readers, err + } + r, err := sstable.NewReader(readable, readerOpts, extraReaderOpts...) if err != nil { return readers, err } diff --git a/external_iterator_test.go b/external_iterator_test.go index ab35362730..21eaf4c2c9 100644 --- a/external_iterator_test.go +++ b/external_iterator_test.go @@ -227,7 +227,10 @@ func TestIterRandomizedMaybeFilteredKeys(t *testing.T) { f1, err := mem.Open(filename) require.NoError(t, err) - r, err := sstable.NewReader(f1, sstable.ReaderOptions{ + readable, err := sstable.NewSimpleReadable(f1) + require.NoError(t, err) + + r, err := sstable.NewReader(readable, sstable.ReaderOptions{ Cache: c, Comparer: testkeys.Comparer, }) diff --git a/ingest.go b/ingest.go index 3ff7aee0c3..476135b3f4 100644 --- a/ingest.go +++ b/ingest.go @@ -47,18 +47,18 @@ func ingestValidateKey(opts *Options, key *InternalKey) error { func ingestLoad1( opts *Options, fmv FormatMajorVersion, path string, cacheID uint64, fileNum FileNum, ) (*fileMetadata, error) { - stat, err := opts.FS.Stat(path) + f, err := opts.FS.Open(path) if err != nil { return nil, err } - f, err := opts.FS.Open(path) + readable, err := sstable.NewSimpleReadable(f) if err != nil { return nil, err } cacheOpts := private.SSTableCacheOpts(cacheID, fileNum).(sstable.ReaderOption) - r, err := sstable.NewReader(f, opts.MakeReaderOptions(), cacheOpts) + r, err := sstable.NewReader(readable, opts.MakeReaderOptions(), cacheOpts) if err != nil { return nil, err } @@ -78,7 +78,7 @@ func ingestLoad1( meta := &fileMetadata{} meta.FileNum = fileNum - meta.Size = uint64(stat.Size()) + meta.Size = uint64(readable.Size()) meta.CreationTime = time.Now().Unix() // Avoid loading into the table cache for collecting stats if we diff --git a/ingest_test.go b/ingest_test.go index 3f3b4fa011..168db26595 100644 --- a/ingest_test.go +++ b/ingest_test.go @@ -1622,9 +1622,11 @@ func TestIngestValidation(t *testing.T) { defer func() { require.NoError(t, d.Close()) }() corrupt := func(f vfs.File) { + readable, err := sstable.NewSimpleReadable(f) + require.NoError(t, err) // Compute the layout of the sstable in order to find the // appropriate block locations to corrupt. - r, err := sstable.NewReader(f, sstable.ReaderOptions{}) + r, err := sstable.NewReader(readable, sstable.ReaderOptions{}) require.NoError(t, err) l, err := r.Layout() require.NoError(t, err) diff --git a/level_checker_test.go b/level_checker_test.go index a57a17d1b0..e5fb05eb10 100644 --- a/level_checker_test.go +++ b/level_checker_test.go @@ -203,8 +203,12 @@ func TestCheckLevelsCornerCases(t *testing.T) { if err != nil { return err.Error() } + readable, err := sstable.NewSimpleReadable(f) + if err != nil { + return err.Error() + } cacheOpts := private.SSTableCacheOpts(0, fileNum-1).(sstable.ReaderOption) - r, err := sstable.NewReader(f, sstable.ReaderOptions{}, cacheOpts) + r, err := sstable.NewReader(readable, sstable.ReaderOptions{}, cacheOpts) if err != nil { return err.Error() } diff --git a/level_iter_test.go b/level_iter_test.go index ed4fc7a237..893aba6c54 100644 --- a/level_iter_test.go +++ b/level_iter_test.go @@ -251,7 +251,11 @@ func (lt *levelIterTest) runBuild(d *datadriven.TestData) string { if err != nil { return err.Error() } - r, err := sstable.NewReader(f1, sstable.ReaderOptions{ + readable, err := sstable.NewSimpleReadable(f1) + if err != nil { + return err.Error() + } + r, err := sstable.NewReader(readable, sstable.ReaderOptions{ Filters: map[string]FilterPolicy{ fp.Name(): fp, }, @@ -479,7 +483,11 @@ func buildLevelIterTables( if err != nil { b.Fatal(err) } - readers[i], err = sstable.NewReader(f, opts) + readable, err := sstable.NewSimpleReadable(f) + if err != nil { + b.Fatal(err) + } + readers[i], err = sstable.NewReader(readable, opts) if err != nil { b.Fatal(err) } diff --git a/merging_iter_test.go b/merging_iter_test.go index 16352f2b92..3f888a1005 100644 --- a/merging_iter_test.go +++ b/merging_iter_test.go @@ -234,7 +234,11 @@ func TestMergingIterCornerCases(t *testing.T) { if err != nil { return err.Error() } - r, err := sstable.NewReader(f, sstable.ReaderOptions{}) + readable, err := sstable.NewSimpleReadable(f) + if err != nil { + return err.Error() + } + r, err := sstable.NewReader(readable, sstable.ReaderOptions{}) if err != nil { return err.Error() } @@ -327,7 +331,11 @@ func buildMergingIterTables( if err != nil { b.Fatal(err) } - readers[i], err = sstable.NewReader(f, opts) + readable, err := sstable.NewSimpleReadable(f) + if err != nil { + b.Fatal(err) + } + readers[i], err = sstable.NewReader(readable, opts) if err != nil { b.Fatal(err) } @@ -563,7 +571,11 @@ func buildLevelsForMergingIterSeqSeek( if err != nil { b.Fatal(err) } - r, err := sstable.NewReader(f, opts) + readable, err := sstable.NewSimpleReadable(f) + if err != nil { + b.Fatal(err) + } + r, err := sstable.NewReader(readable, opts) if err != nil { b.Fatal(err) } diff --git a/objstorage/noop_readahead.go b/objstorage/noop_readahead.go new file mode 100644 index 0000000000..2e32413456 --- /dev/null +++ b/objstorage/noop_readahead.go @@ -0,0 +1,29 @@ +// Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package objstorage + +import "io" + +// NoopReadaheadHandle can be used by Readable implementations that don't +// support read-ahead. +type NoopReadaheadHandle struct { + io.ReaderAt +} + +// MakeNoopReadaheadHandle initializes a NoopReadaheadHandle. +func MakeNoopReadaheadHandle(r io.ReaderAt) NoopReadaheadHandle { + return NoopReadaheadHandle{ReaderAt: r} +} + +var _ ReadaheadHandle = (*NoopReadaheadHandle)(nil) + +// Close is part of the ReadaheadHandle interface. +func (*NoopReadaheadHandle) Close() error { return nil } + +// MaxReadahead is part of the ReadaheadHandle interface. +func (*NoopReadaheadHandle) MaxReadahead() {} + +// RecordCacheHit is part of the ReadaheadHandle interface. +func (*NoopReadaheadHandle) RecordCacheHit(offset, size int64) {} diff --git a/objstorage/provider.go b/objstorage/provider.go new file mode 100644 index 0000000000..47fa48c1b0 --- /dev/null +++ b/objstorage/provider.go @@ -0,0 +1,144 @@ +// Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package objstorage + +import ( + "io" + + "github.com/cockroachdb/pebble/internal/base" + "github.com/cockroachdb/pebble/vfs" +) + +// Provider is a singleton object used to access and manage objects. +// +// An object is conceptually like a large immutable file. The main use of +// objects is for storing sstables; in the future it could also be used for blob +// storage. +// +// Objects are currently backed by a vfs.File. +type Provider struct { + st Settings + + // TODO(radu): to we support shared storage as well, this object will need to + // maintain a FileNum to backend type mapping. + + // TODO(radu): add more functionality around listing, copying, linking, etc. +} + +// Readable is the handle for an object that is open for reading. +type Readable interface { + io.ReaderAt + io.Closer + + // Size returns the size of the object. + Size() int64 + + // NewReadaheadHandle creates a read-ahead handle which encapsulates + // read-ahead state. To benefit from read-ahead, ReadaheadHandle.ReadAt must + // be used (as opposed to Readable.ReadAt). + // + // The ReadaheadHandle must be closed before the Readable is closed. + // + // Multiple separate ReadaheadHandles can be used. + NewReadaheadHandle() ReadaheadHandle +} + +// ReadaheadHandle is used to perform reads that might benefit from read-ahead. +type ReadaheadHandle interface { + io.ReaderAt + io.Closer + + // MaxReadahead configures the implementation to expect large sequential + // reads. Used to skip any initial read-ahead ramp-up. + MaxReadahead() + + // RecordCacheHit informs the implementation that we were able to retrieve a + // block from cache. + RecordCacheHit(offset, size int64) +} + +// Writable is the handle for an object that is open for writing. +type Writable interface { + // Unlike the specification for io.Writer.Write(), the Writable.Write() + // method *is* allowed to modify the slice passed in, whether temporarily + // or permanently. Callers of Write() need to take this into account. + io.Writer + io.Closer + + Sync() error +} + +// Settings that must be specified when creating the Provider. +type Settings struct { + // Local filesystem configuration. + FS vfs.FS + FSDirName string + + // NoSyncOnClose decides whether the implementation will enforce a + // close-time synchronization (e.g., fdatasync() or sync_file_range()) + // on files it writes to. Setting this to true removes the guarantee for a + // sync on close. Some implementations can still issue a non-blocking sync. + NoSyncOnClose bool + + // BytesPerSync enables periodic syncing of files in order to smooth out + // writes to disk. This option does not provide any persistence guarantee, but + // is used to avoid latency spikes if the OS automatically decides to write + // out a large chunk of dirty filesystem buffers. + BytesPerSync int +} + +// DefaultSettings initializes default settings, suitable for tests and tools. +func DefaultSettings(fs vfs.FS, dirName string) Settings { + return Settings{ + FS: fs, + FSDirName: dirName, + NoSyncOnClose: false, + BytesPerSync: 512 * 1024, // 512KB + } +} + +// New creates the Provider. +func New(settings Settings) *Provider { + return &Provider{ + st: settings, + } +} + +// Path returns an internal path for an object. It is used for informative +// purposes (e.g. logging). +func (p *Provider) Path(fileType base.FileType, fileNum base.FileNum) string { + return base.MakeFilepath(p.st.FS, p.st.FSDirName, fileType, fileNum) +} + +// OpenForReading opens an existing object. +func (p *Provider) OpenForReading(fileType base.FileType, fileNum base.FileNum) (Readable, error) { + filename := p.Path(fileType, fileNum) + file, err := p.st.FS.Open(filename, vfs.RandomReadsOption) + if err != nil { + return nil, err + } + if fd := file.Fd(); fd != vfs.InvalidFd { + return newFileReadable(file, fd, p.st.FS, filename) + } + return newGenericFileReadable(file) +} + +// Create creates a new object and opens it for writing. +func (p *Provider) Create(fileType base.FileType, fileNum base.FileNum) (Writable, error) { + file, err := p.st.FS.Create(p.Path(fileType, fileNum)) + if err != nil { + return nil, err + } + file = vfs.NewSyncingFile(file, vfs.SyncingFileOptions{ + NoSyncOnClose: p.st.NoSyncOnClose, + BytesPerSync: p.st.BytesPerSync, + }) + return newFileBufferedWritable(file), nil +} + +// Remove removes an object. +func (p *Provider) Remove(fileType base.FileType, fileNum base.FileNum) error { + return p.st.FS.Remove(p.Path(fileType, fileNum)) +} diff --git a/objstorage/readahead.go b/objstorage/readahead.go new file mode 100644 index 0000000000..963ee7e2ff --- /dev/null +++ b/objstorage/readahead.go @@ -0,0 +1,194 @@ +// Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package objstorage + +const ( + // Constants for dynamic readahead of data blocks. Note that the size values + // make sense as some multiple of the default block size; and they should + // both be larger than the default block size. + minFileReadsForReadahead = 2 + // TODO(bilal): Have the initial size value be a factor of the block size, + // as opposed to a hardcoded value. + initialReadaheadSize = 64 << 10 /* 64KB */ + maxReadaheadSize = 256 << 10 /* 256KB */ +) + +// readaheadState contains state variables related to readahead. Updated on +// file reads. +type readaheadState struct { + // Number of sequential reads. + numReads int64 + // Size issued to the next call to Prefetch. Starts at or above + // initialReadaheadSize and grows exponentially until maxReadaheadSize. + size int64 + // prevSize is the size used in the last Prefetch call. + prevSize int64 + // The byte offset up to which the OS has been asked to read ahead / cached. + // When reading ahead, reads up to this limit should not incur an IO + // operation. Reads after this limit can benefit from a new call to + // Prefetch. + limit int64 +} + +func (rs *readaheadState) recordCacheHit(offset, blockLength int64) { + currentReadEnd := offset + blockLength + if rs.numReads >= minFileReadsForReadahead { + if currentReadEnd >= rs.limit && offset <= rs.limit+maxReadaheadSize { + // This is a read that would have resulted in a readahead, had it + // not been a cache hit. + rs.limit = currentReadEnd + return + } + if currentReadEnd < rs.limit-rs.prevSize || offset > rs.limit+maxReadaheadSize { + // We read too far away from rs.limit to benefit from readahead in + // any scenario. Reset all variables. + rs.numReads = 1 + rs.limit = currentReadEnd + rs.size = initialReadaheadSize + rs.prevSize = 0 + return + } + // Reads in the range [rs.limit - rs.prevSize, rs.limit] end up + // here. This is a read that is potentially benefitting from a past + // readahead. + return + } + if currentReadEnd >= rs.limit && offset <= rs.limit+maxReadaheadSize { + // Blocks are being read sequentially and would benefit from readahead + // down the line. + rs.numReads++ + return + } + // We read too far ahead of the last read, or before it. This indicates + // a random read, where readahead is not desirable. Reset all variables. + rs.numReads = 1 + rs.limit = currentReadEnd + rs.size = initialReadaheadSize + rs.prevSize = 0 +} + +// maybeReadahead updates state and determines whether to issue a readahead / +// prefetch call for a block read at offset for blockLength bytes. +// Returns a size value (greater than 0) that should be prefetched if readahead +// would be beneficial. +func (rs *readaheadState) maybeReadahead(offset, blockLength int64) int64 { + currentReadEnd := offset + blockLength + if rs.numReads >= minFileReadsForReadahead { + // The minimum threshold of sequential reads to justify reading ahead + // has been reached. + // There are two intervals: the interval being read: + // [offset, currentReadEnd] + // as well as the interval where a read would benefit from read ahead: + // [rs.limit, rs.limit + rs.size] + // We increase the latter interval to + // [rs.limit, rs.limit + maxReadaheadSize] to account for cases where + // readahead may not be beneficial with a small readahead size, but over + // time the readahead size would increase exponentially to make it + // beneficial. + if currentReadEnd >= rs.limit && offset <= rs.limit+maxReadaheadSize { + // We are doing a read in the interval ahead of + // the last readahead range. In the diagrams below, ++++ is the last + // readahead range, ==== is the range represented by + // [rs.limit, rs.limit + maxReadaheadSize], and ---- is the range + // being read. + // + // rs.limit rs.limit + maxReadaheadSize + // ++++++++++|===========================| + // + // |-------------| + // offset currentReadEnd + // + // This case is also possible, as are all cases with an overlap + // between [rs.limit, rs.limit + maxReadaheadSize] and [offset, + // currentReadEnd]: + // + // rs.limit rs.limit + maxReadaheadSize + // ++++++++++|===========================| + // + // |-------------| + // offset currentReadEnd + // + // + rs.numReads++ + rs.limit = offset + rs.size + rs.prevSize = rs.size + // Increase rs.size for the next read. + rs.size *= 2 + if rs.size > maxReadaheadSize { + rs.size = maxReadaheadSize + } + return rs.prevSize + } + if currentReadEnd < rs.limit-rs.prevSize || offset > rs.limit+maxReadaheadSize { + // The above conditional has rs.limit > rs.prevSize to confirm that + // rs.limit - rs.prevSize would not underflow. + // We read too far away from rs.limit to benefit from readahead in + // any scenario. Reset all variables. + // The case where we read too far ahead: + // + // (rs.limit - rs.prevSize) (rs.limit) (rs.limit + maxReadaheadSize) + // |+++++++++++++|=============| + // + // |-------------| + // offset currentReadEnd + // + // Or too far behind: + // + // (rs.limit - rs.prevSize) (rs.limit) (rs.limit + maxReadaheadSize) + // |+++++++++++++|=============| + // + // |-------------| + // offset currentReadEnd + // + rs.numReads = 1 + rs.limit = currentReadEnd + rs.size = initialReadaheadSize + rs.prevSize = 0 + + return 0 + } + // Reads in the range [rs.limit - rs.prevSize, rs.limit] end up + // here. This is a read that is potentially benefitting from a past + // readahead, but there's no reason to issue a readahead call at the + // moment. + // + // (rs.limit - rs.prevSize) (rs.limit + maxReadaheadSize) + // |+++++++++++++|===============| + // (rs.limit) + // + // |-------| + // offset currentReadEnd + // + rs.numReads++ + return 0 + } + if currentReadEnd >= rs.limit && offset <= rs.limit+maxReadaheadSize { + // Blocks are being read sequentially and would benefit from readahead + // down the line. + // + // (rs.limit) (rs.limit + maxReadaheadSize) + // |=============| + // + // |-------| + // offset currentReadEnd + // + rs.numReads++ + return 0 + } + // We read too far ahead of the last read, or before it. This indicates + // a random read, where readahead is not desirable. Reset all variables. + // + // (rs.limit - maxReadaheadSize) (rs.limit) (rs.limit + maxReadaheadSize) + // |+++++++++++++|=============| + // + // |-------| + // offset currentReadEnd + // + rs.numReads = 1 + rs.limit = currentReadEnd + rs.size = initialReadaheadSize + rs.prevSize = 0 + return 0 +} diff --git a/objstorage/readahead_test.go b/objstorage/readahead_test.go new file mode 100644 index 0000000000..3a128944db --- /dev/null +++ b/objstorage/readahead_test.go @@ -0,0 +1,59 @@ +// Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package objstorage + +import ( + "fmt" + "strconv" + "strings" + "testing" + + "github.com/cockroachdb/datadriven" + "github.com/stretchr/testify/require" +) + +func TestMaybeReadahead(t *testing.T) { + var rs readaheadState + datadriven.RunTest(t, "testdata/readahead", func(t *testing.T, d *datadriven.TestData) string { + cacheHit := false + switch d.Cmd { + case "reset": + rs.size = initialReadaheadSize + rs.limit = 0 + rs.numReads = 0 + return "" + + case "cache-read": + cacheHit = true + fallthrough + case "read": + args := strings.Split(d.Input, ",") + if len(args) != 2 { + return "expected 2 args: offset, size" + } + + offset, err := strconv.ParseInt(strings.TrimSpace(args[0]), 10, 64) + require.NoError(t, err) + size, err := strconv.ParseInt(strings.TrimSpace(args[1]), 10, 64) + require.NoError(t, err) + var raSize int64 + if cacheHit { + rs.recordCacheHit(offset, size) + } else { + raSize = rs.maybeReadahead(offset, size) + } + + var buf strings.Builder + fmt.Fprintf(&buf, "readahead: %d\n", raSize) + fmt.Fprintf(&buf, "numReads: %d\n", rs.numReads) + fmt.Fprintf(&buf, "size: %d\n", rs.size) + fmt.Fprintf(&buf, "prevSize: %d\n", rs.prevSize) + fmt.Fprintf(&buf, "limit: %d", rs.limit) + return buf.String() + default: + return fmt.Sprintf("unknown command: %s", d.Cmd) + } + }) +} diff --git a/sstable/testdata/readahead b/objstorage/testdata/readahead similarity index 100% rename from sstable/testdata/readahead rename to objstorage/testdata/readahead diff --git a/objstorage/vfs_readable.go b/objstorage/vfs_readable.go new file mode 100644 index 0000000000..9c5be93b54 --- /dev/null +++ b/objstorage/vfs_readable.go @@ -0,0 +1,209 @@ +// Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package objstorage + +import ( + "fmt" + "os" + "sync" + + "github.com/cockroachdb/pebble/internal/invariants" + "github.com/cockroachdb/pebble/vfs" +) + +// fileReadable implements objstorage.Readable on top of a vfs.File that is backed +// by an OS descriptor. +type fileReadable struct { + file vfs.File + fd uintptr + size int64 + + // The following fields are used to possibly open the file again using the + // sequential reads option (see readaheadHandle). + filename string + fs vfs.FS +} + +var _ Readable = (*fileReadable)(nil) + +func newFileReadable(file vfs.File, fd uintptr, fs vfs.FS, filename string) (*fileReadable, error) { + info, err := file.Stat() + if err != nil { + return nil, err + } + r := &fileReadable{ + file: file, + fd: fd, + size: info.Size(), + filename: filename, + fs: fs, + } + invariants.SetFinalizer(r, func(obj interface{}) { + if obj.(*fileReadable).file != nil { + fmt.Fprintf(os.Stderr, "Readable was not closed") + os.Exit(1) + } + }) + return r, nil +} + +// ReadAt is part of the objstorage.Readable interface. +func (r *fileReadable) ReadAt(p []byte, off int64) (n int, err error) { + return r.file.ReadAt(p, off) +} + +// Close is part of the objstorage.Readable interface. +func (r *fileReadable) Close() error { + defer func() { r.file = nil }() + return r.file.Close() +} + +// Size is part of the objstorage.Readable interface. +func (r *fileReadable) Size() int64 { + return r.size +} + +// NewReadaheadHandle is part of the objstorage.Readable interface. +func (r *fileReadable) NewReadaheadHandle() ReadaheadHandle { + rh := readaheadHandlePool.Get().(*readaheadHandle) + rh.r = r + return rh +} + +type readaheadHandle struct { + r *fileReadable + rs readaheadState + + // sequentialFile holds a file descriptor to the same underlying File, + // except with fadvise(FADV_SEQUENTIAL) called on it to take advantage of + // OS-level readahead. Once this is non-nil, the other variables in + // readaheadState don't matter much as we defer to OS-level readahead. + sequentialFile vfs.File +} + +var _ ReadaheadHandle = (*readaheadHandle)(nil) + +var readaheadHandlePool = sync.Pool{ + New: func() interface{} { + i := &readaheadHandle{} + // Note: this is a no-op if invariants are disabled or race is enabled. + invariants.SetFinalizer(i, func(obj interface{}) { + if obj.(*readaheadHandle).r != nil { + fmt.Fprintf(os.Stderr, "ReadaheadHandle was not closed") + os.Exit(1) + } + }) + return i + }, +} + +// Close is part of the objstorage.ReadaheadHandle interface. +func (rh *readaheadHandle) Close() error { + var err error + if rh.sequentialFile != nil { + err = rh.sequentialFile.Close() + } + *rh = readaheadHandle{} + readaheadHandlePool.Put(rh) + return err +} + +// ReadAt is part of the objstorage.ReadaheadHandle interface. +func (rh *readaheadHandle) ReadAt(p []byte, offset int64) (n int, err error) { + if rh.sequentialFile != nil { + // Use OS-level read-ahead. + return rh.sequentialFile.ReadAt(p, offset) + } + if readaheadSize := rh.rs.maybeReadahead(offset, int64(len(p))); readaheadSize > 0 { + if readaheadSize >= maxReadaheadSize { + // We've reached the maximum readahead size. Beyond this point, rely on + // OS-level readahead. + rh.MaxReadahead() + } else { + _ = vfs.Prefetch(rh.r.fd, uint64(offset), uint64(readaheadSize)) + } + } + return rh.r.file.ReadAt(p, offset) +} + +// MaxReadahead is part of the objstorage.ReadaheadHandle interface. +func (rh *readaheadHandle) MaxReadahead() { + if rh.sequentialFile != nil { + return + } + + // TODO(radu): we could share the reopened file descriptor across multiple + // handles. + f, err := rh.r.fs.Open(rh.r.filename, vfs.SequentialReadsOption) + if err == nil { + rh.sequentialFile = f + } +} + +// RecordCacheHit is part of the objstorage.ReadaheadHandle interface. +func (rh *readaheadHandle) RecordCacheHit(offset, size int64) { + if rh.sequentialFile != nil { + // Using OS-level readahead, so do nothing. + return + } + rh.rs.recordCacheHit(offset, size) +} + +// genericFileReadable implements objstorage.Readable on top of any vfs.File. +// This implementation does not support read-ahead. +type genericFileReadable struct { + file vfs.File + size int64 + + rh NoopReadaheadHandle +} + +var _ Readable = (*genericFileReadable)(nil) + +func newGenericFileReadable(file vfs.File) (*genericFileReadable, error) { + info, err := file.Stat() + if err != nil { + return nil, err + } + r := &genericFileReadable{ + file: file, + size: info.Size(), + rh: MakeNoopReadaheadHandle(file), + } + invariants.SetFinalizer(r, func(obj interface{}) { + if obj.(*genericFileReadable).file != nil { + fmt.Fprintf(os.Stderr, "Readable was not closed") + os.Exit(1) + } + }) + return r, nil +} + +// ReadAt is part of the objstorage.Readable interface. +func (r *genericFileReadable) ReadAt(p []byte, off int64) (n int, err error) { + return r.file.ReadAt(p, off) +} + +// Close is part of the objstorage.Readable interface. +func (r *genericFileReadable) Close() error { + defer func() { r.file = nil }() + return r.file.Close() +} + +// Size is part of the objstorage.Readable interface. +func (r *genericFileReadable) Size() int64 { + return r.size +} + +// NewReadaheadHandle is part of the objstorage.Readable interface. +func (r *genericFileReadable) NewReadaheadHandle() ReadaheadHandle { + return &r.rh +} + +// TestingCheckMaxReadahead returns true if the ReadaheadHandle has switched to +// OS-level read-ahead. +func TestingCheckMaxReadahead(rh ReadaheadHandle) bool { + return rh.(*readaheadHandle).sequentialFile != nil +} diff --git a/objstorage/vfs_writable.go b/objstorage/vfs_writable.go new file mode 100644 index 0000000000..d3a5280d73 --- /dev/null +++ b/objstorage/vfs_writable.go @@ -0,0 +1,43 @@ +// Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package objstorage + +import ( + "bufio" + + "github.com/cockroachdb/pebble/vfs" +) + +type fileBufferedWritable struct { + file vfs.File + bw *bufio.Writer +} + +var _ Writable = (*fileBufferedWritable)(nil) + +func newFileBufferedWritable(file vfs.File) *fileBufferedWritable { + return &fileBufferedWritable{ + file: file, + bw: bufio.NewWriter(file), + } +} + +// Write is part of the objstorage.Writable interface. +func (w *fileBufferedWritable) Write(p []byte) (n int, err error) { + return w.bw.Write(p) +} + +// Sync is part of the objstorage.Writable interface. +func (w *fileBufferedWritable) Sync() error { + if err := w.bw.Flush(); err != nil { + return err + } + return w.file.Sync() +} + +// Close is part of the objstorage.Writable interface. +func (w *fileBufferedWritable) Close() error { + return w.file.Close() +} diff --git a/replay/replay.go b/replay/replay.go index 6416c1c6b5..26112fd860 100644 --- a/replay/replay.go +++ b/replay/replay.go @@ -785,7 +785,12 @@ func loadFlushedSSTableKeys( if err != nil { return err } - r, err := sstable.NewReader(f, readOpts) + readable, err := sstable.NewSimpleReadable(f) + if err != nil { + f.Close() + return err + } + r, err := sstable.NewReader(readable, readOpts) if err != nil { return err } diff --git a/sstable/data_test.go b/sstable/data_test.go index 33ac5c5bcb..7776a1d67a 100644 --- a/sstable/data_test.go +++ b/sstable/data_test.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/pebble/internal/cache" "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/testkeys" + "github.com/cockroachdb/pebble/objstorage" "github.com/cockroachdb/pebble/vfs" ) @@ -170,7 +171,9 @@ func runBuildRawCmd( td *datadriven.TestData, opts *WriterOptions, ) (*WriterMetadata, *Reader, error) { mem := vfs.NewMem() - f0, err := mem.Create("test") + provider := objstorage.New(objstorage.DefaultSettings(mem, "" /* dirName */)) + + f0, err := provider.Create(base.FileTypeTable, 0 /* fileNum */) if err != nil { return nil, nil, err } @@ -217,7 +220,7 @@ func runBuildRawCmd( return nil, nil, err } - f1, err := mem.Open("test") + f1, err := provider.OpenForReading(base.FileTypeTable, 0 /* fileNum */) if err != nil { return nil, nil, err } diff --git a/sstable/properties_test.go b/sstable/properties_test.go index 56b189ab01..583d3fce1c 100644 --- a/sstable/properties_test.go +++ b/sstable/properties_test.go @@ -48,7 +48,7 @@ func TestPropertiesLoad(t *testing.T) { f, err := os.Open(filepath.FromSlash("testdata/h.sst")) require.NoError(t, err) - r, err := NewReader(f, ReaderOptions{}) + r, err := newReader(f, ReaderOptions{}) require.NoError(t, err) defer r.Close() diff --git a/sstable/reader.go b/sstable/reader.go index 768d3f6f7e..042e1b6795 100644 --- a/sstable/reader.go +++ b/sstable/reader.go @@ -23,23 +23,12 @@ import ( "github.com/cockroachdb/pebble/internal/invariants" "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/private" - "github.com/cockroachdb/pebble/vfs" + "github.com/cockroachdb/pebble/objstorage" ) var errCorruptIndexEntry = base.CorruptionErrorf("pebble/table: corrupt index entry") var errReaderClosed = errors.New("pebble/table: reader is closed") -const ( - // Constants for dynamic readahead of data blocks. Note that the size values - // make sense as some multiple of the default block size; and they should - // both be larger than the default block size. - minFileReadsForReadahead = 2 - // TODO(bilal): Have the initial size value be a factor of the block size, - // as opposed to a hardcoded value. - initialReadaheadSize = 64 << 10 /* 64KB */ - maxReadaheadSize = 256 << 10 /* 256KB */ -) - // decodeBlockHandle returns the block handle encoded at the start of src, as // well as the number of bytes it occupies. It returns zero if given invalid // input. A block handle for a data block or a first/lower level index block @@ -209,7 +198,7 @@ type singleLevelIterator struct { reader *Reader index blockIter data blockIter - dataRS readaheadState + dataRH objstorage.ReadaheadHandle // dataBH refers to the last data block that the iterator considered // loading. It may not actually have loaded the block, due to an error or // because it was considered irrelevant. @@ -416,7 +405,7 @@ func (i *singleLevelIterator) init( _ = i.index.Close() return err } - i.dataRS.size = initialReadaheadSize + i.dataRH = r.readable.NewReadaheadHandle() if r.tableFormat == TableFormatPebblev3 { if r.Properties.NumValueBlocks > 0 { i.vbReader = &valueBlockReader{ @@ -435,14 +424,7 @@ func (i *singleLevelIterator) init( // setupForCompaction sets up the singleLevelIterator for use with compactionIter. // Currently, it skips readahead ramp-up. It should be called after init is called. func (i *singleLevelIterator) setupForCompaction() { - if i.reader.fs != nil { - f, err := i.reader.fs.Open(i.reader.filename, vfs.SequentialReadsOption) - if err == nil { - // Given that this iterator is for a compaction, we can assume that it - // will be read sequentially and we can skip the readahead ramp-up. - i.dataRS.sequentialFile = f - } - } + i.dataRH.MaxReadahead() } func (i *singleLevelIterator) resetForReuse() singleLevelIterator { @@ -530,7 +512,7 @@ func (i *singleLevelIterator) loadBlock(dir int8) loadBlockResult { } // blockIntersects } - block, err := i.readBlockWithStats(i.dataBH, &i.dataRS) + block, err := i.readBlockWithStats(i.dataBH, i.dataRH) if err != nil { i.err = err return loadBlockFailed @@ -546,11 +528,12 @@ func (i *singleLevelIterator) loadBlock(dir int8) loadBlockResult { } // readBlockForVBR implements the blockProviderWhenOpen interface for use by -// the valueBlockReader. We could use a readaheadState for this (that would be -// different from the readaheadState for the data blocks), but choose to use -// nil since (a) for user-facing reads we expect access to the value blocks to -// be rare, (b) for compactions, we are not using the logic in readaheadState -// and deferring to OS-level readahead. +// the valueBlockReader. +// +// TODO(radu, sumeer): we should use a ReadaheadHandle here, separate from the +// ReadHandle for the data blocks. Especially for the compaction case, where we +// are reading the value blocks. For user-facing reads, this may be less +// necessary, under the assumption that value blocks are rarely read. func (i *singleLevelIterator) readBlockForVBR( h BlockHandle, stats *base.InternalIteratorStats, ) (cache.Handle, error) { @@ -625,9 +608,9 @@ func (i *singleLevelIterator) resolveMaybeExcluded(dir int8) intersectsResult { } func (i *singleLevelIterator) readBlockWithStats( - bh BlockHandle, raState *readaheadState, + bh BlockHandle, rh objstorage.ReadaheadHandle, ) (cache.Handle, error) { - return i.reader.readBlock(bh, nil /* transform */, raState, i.stats) + return i.reader.readBlock(bh, nil /* transform */, rh, i.stats) } func (i *singleLevelIterator) initBoundsForAlreadyLoadedBlock() { @@ -1393,9 +1376,9 @@ func (i *singleLevelIterator) Close() error { } err = firstError(err, i.data.Close()) err = firstError(err, i.index.Close()) - if i.dataRS.sequentialFile != nil { - err = firstError(err, i.dataRS.sequentialFile.Close()) - i.dataRS.sequentialFile = nil + if i.dataRH != nil { + err = firstError(err, i.dataRH.Close()) + i.dataRH = nil } err = firstError(err, i.err) if i.bpfs != nil { @@ -1592,7 +1575,7 @@ func (i *twoLevelIterator) loadIndex(dir int8) loadBlockResult { } // blockIntersects } - indexBlock, err := i.readBlockWithStats(bhp.BlockHandle, nil /* readaheadState */) + indexBlock, err := i.readBlockWithStats(bhp.BlockHandle, nil /* readaheadHandle */) if err != nil { i.err = err return loadBlockFailed @@ -1696,7 +1679,7 @@ func (i *twoLevelIterator) init( _ = i.topLevelIndex.Close() return err } - i.dataRS.size = initialReadaheadSize + i.dataRH = r.readable.NewReadaheadHandle() if r.tableFormat == TableFormatPebblev3 { if r.Properties.NumValueBlocks > 0 { i.vbReader = &valueBlockReader{ @@ -2348,9 +2331,9 @@ func (i *twoLevelIterator) Close() error { err = firstError(err, i.data.Close()) err = firstError(err, i.index.Close()) err = firstError(err, i.topLevelIndex.Close()) - if i.dataRS.sequentialFile != nil { - err = firstError(err, i.dataRS.sequentialFile.Close()) - i.dataRS.sequentialFile = nil + if i.dataRH != nil { + err = firstError(err, i.dataRH.Close()) + i.dataRH = nil } err = firstError(err, i.err) if i.bpfs != nil { @@ -2470,203 +2453,6 @@ func (i *twoLevelCompactionIterator) skipForward( type blockTransform func([]byte) ([]byte, error) -// readaheadState contains state variables related to readahead. Updated on -// file reads. -type readaheadState struct { - // Number of sequential reads. - numReads int64 - // Size issued to the next call to Prefetch. Starts at or above - // initialReadaheadSize and grows exponentially until maxReadaheadSize. - size int64 - // prevSize is the size used in the last Prefetch call. - prevSize int64 - // The byte offset up to which the OS has been asked to read ahead / cached. - // When reading ahead, reads up to this limit should not incur an IO - // operation. Reads after this limit can benefit from a new call to - // Prefetch. - limit int64 - // sequentialFile holds a file descriptor to the same underlying File, - // except with fadvise(FADV_SEQUENTIAL) called on it to take advantage of - // OS-level readahead. Initialized when the iterator has been consistently - // reading blocks in a sequential access pattern. Once this is non-nil, - // the other variables in readaheadState don't matter much as we defer - // to OS-level readahead. - // - // For TableFormatPebblev3, there are potentially two different sequential - // accesses, for the data block and the value blocks. It is unclear to me - // how FADV_SEQUENTIAL will function for such accesses. We do expect that - // the average workload will have most data in data blocks. - sequentialFile vfs.File -} - -func (rs *readaheadState) recordCacheHit(offset, blockLength int64) { - currentReadEnd := offset + blockLength - if rs.sequentialFile != nil { - // Using OS-level readahead instead, so do nothing. - return - } - if rs.numReads >= minFileReadsForReadahead { - if currentReadEnd >= rs.limit && offset <= rs.limit+maxReadaheadSize { - // This is a read that would have resulted in a readahead, had it - // not been a cache hit. - rs.limit = currentReadEnd - return - } - if currentReadEnd < rs.limit-rs.prevSize || offset > rs.limit+maxReadaheadSize { - // We read too far away from rs.limit to benefit from readahead in - // any scenario. Reset all variables. - rs.numReads = 1 - rs.limit = currentReadEnd - rs.size = initialReadaheadSize - rs.prevSize = 0 - return - } - // Reads in the range [rs.limit - rs.prevSize, rs.limit] end up - // here. This is a read that is potentially benefitting from a past - // readahead. - return - } - if currentReadEnd >= rs.limit && offset <= rs.limit+maxReadaheadSize { - // Blocks are being read sequentially and would benefit from readahead - // down the line. - rs.numReads++ - return - } - // We read too far ahead of the last read, or before it. This indicates - // a random read, where readahead is not desirable. Reset all variables. - rs.numReads = 1 - rs.limit = currentReadEnd - rs.size = initialReadaheadSize - rs.prevSize = 0 -} - -// maybeReadahead updates state and determines whether to issue a readahead / -// prefetch call for a block read at offset for blockLength bytes. -// Returns a size value (greater than 0) that should be prefetched if readahead -// would be beneficial. -func (rs *readaheadState) maybeReadahead(offset, blockLength int64) int64 { - currentReadEnd := offset + blockLength - if rs.sequentialFile != nil { - // Using OS-level readahead instead, so do nothing. - return 0 - } - if rs.numReads >= minFileReadsForReadahead { - // The minimum threshold of sequential reads to justify reading ahead - // has been reached. - // There are two intervals: the interval being read: - // [offset, currentReadEnd] - // as well as the interval where a read would benefit from read ahead: - // [rs.limit, rs.limit + rs.size] - // We increase the latter interval to - // [rs.limit, rs.limit + maxReadaheadSize] to account for cases where - // readahead may not be beneficial with a small readahead size, but over - // time the readahead size would increase exponentially to make it - // beneficial. - if currentReadEnd >= rs.limit && offset <= rs.limit+maxReadaheadSize { - // We are doing a read in the interval ahead of - // the last readahead range. In the diagrams below, ++++ is the last - // readahead range, ==== is the range represented by - // [rs.limit, rs.limit + maxReadaheadSize], and ---- is the range - // being read. - // - // rs.limit rs.limit + maxReadaheadSize - // ++++++++++|===========================| - // - // |-------------| - // offset currentReadEnd - // - // This case is also possible, as are all cases with an overlap - // between [rs.limit, rs.limit + maxReadaheadSize] and [offset, - // currentReadEnd]: - // - // rs.limit rs.limit + maxReadaheadSize - // ++++++++++|===========================| - // - // |-------------| - // offset currentReadEnd - // - // - rs.numReads++ - rs.limit = offset + rs.size - rs.prevSize = rs.size - // Increase rs.size for the next read. - rs.size *= 2 - if rs.size > maxReadaheadSize { - rs.size = maxReadaheadSize - } - return rs.prevSize - } - if currentReadEnd < rs.limit-rs.prevSize || offset > rs.limit+maxReadaheadSize { - // The above conditional has rs.limit > rs.prevSize to confirm that - // rs.limit - rs.prevSize would not underflow. - // We read too far away from rs.limit to benefit from readahead in - // any scenario. Reset all variables. - // The case where we read too far ahead: - // - // (rs.limit - rs.prevSize) (rs.limit) (rs.limit + maxReadaheadSize) - // |+++++++++++++|=============| - // - // |-------------| - // offset currentReadEnd - // - // Or too far behind: - // - // (rs.limit - rs.prevSize) (rs.limit) (rs.limit + maxReadaheadSize) - // |+++++++++++++|=============| - // - // |-------------| - // offset currentReadEnd - // - rs.numReads = 1 - rs.limit = currentReadEnd - rs.size = initialReadaheadSize - rs.prevSize = 0 - return 0 - } - // Reads in the range [rs.limit - rs.prevSize, rs.limit] end up - // here. This is a read that is potentially benefitting from a past - // readahead, but there's no reason to issue a readahead call at the - // moment. - // - // (rs.limit - rs.prevSize) (rs.limit + maxReadaheadSize) - // |+++++++++++++|===============| - // (rs.limit) - // - // |-------| - // offset currentReadEnd - // - rs.numReads++ - return 0 - } - if currentReadEnd >= rs.limit && offset <= rs.limit+maxReadaheadSize { - // Blocks are being read sequentially and would benefit from readahead - // down the line. - // - // (rs.limit) (rs.limit + maxReadaheadSize) - // |=============| - // - // |-------| - // offset currentReadEnd - // - rs.numReads++ - return 0 - } - // We read too far ahead of the last read, or before it. This indicates - // a random read, where readahead is not desirable. Reset all variables. - // - // (rs.limit - maxReadaheadSize) (rs.limit) (rs.limit + maxReadaheadSize) - // |+++++++++++++|=============| - // - // |-------| - // offset currentReadEnd - // - rs.numReads = 1 - rs.limit = currentReadEnd - rs.size = initialReadaheadSize - rs.prevSize = 0 - return 0 -} - // ReaderOption provide an interface to do work on Reader while it is being // opened. type ReaderOption interface { @@ -2735,20 +2521,6 @@ func (c *cacheOpts) writerApply(w *Writer) { } } -// FileReopenOpt is specified if this reader is allowed to reopen additional -// file descriptors for this file. Used to take advantage of OS-level readahead. -type FileReopenOpt struct { - FS vfs.FS - Filename string -} - -func (f FileReopenOpt) readerApply(r *Reader) { - if r.fs == nil { - r.fs = f.FS - r.filename = f.Filename - } -} - // rawTombstonesOpt is a Reader open option for specifying that range // tombstones returned by Reader.NewRangeDelIter() should not be // fragmented. Used by debug tools to get a raw view of the tombstones @@ -2770,9 +2542,7 @@ func init() { // Reader is a table reader. type Reader struct { - file ReadableFile - fs vfs.FS - filename string + readable objstorage.Readable cacheID uint64 fileNum base.FileNum err error @@ -2803,20 +2573,14 @@ type Reader struct { func (r *Reader) Close() error { r.opts.Cache.Unref() + if r.readable != nil { + r.err = firstError(r.err, r.readable.Close()) + r.readable = nil + } + if r.err != nil { - if r.file != nil { - r.file.Close() - r.file = nil - } return r.err } - if r.file != nil { - r.err = r.file.Close() - r.file = nil - if r.err != nil { - return r.err - } - } // Make any future calls to Get, NewIter or Close return an error. r.err = errReaderClosed return nil @@ -2940,19 +2704,19 @@ func (i *rangeKeyFragmentBlockIter) Close() error { } func (r *Reader) readIndex(stats *base.InternalIteratorStats) (cache.Handle, error) { - return r.readBlock(r.indexBH, nil /* transform */, nil /* readaheadState */, stats) + return r.readBlock(r.indexBH, nil /* transform */, nil /* rh */, stats) } func (r *Reader) readFilter(stats *base.InternalIteratorStats) (cache.Handle, error) { - return r.readBlock(r.filterBH, nil /* transform */, nil /* readaheadState */, stats) + return r.readBlock(r.filterBH, nil /* transform */, nil /* readaheadHandle */, stats) } func (r *Reader) readRangeDel(stats *base.InternalIteratorStats) (cache.Handle, error) { - return r.readBlock(r.rangeDelBH, r.rangeDelTransform, nil /* readaheadState */, stats) + return r.readBlock(r.rangeDelBH, r.rangeDelTransform, nil /* readaheadHandle */, stats) } func (r *Reader) readRangeKey(stats *base.InternalIteratorStats) (cache.Handle, error) { - return r.readBlock(r.rangeKeyBH, nil /* transform */, nil /* readaheadState */, stats) + return r.readBlock(r.rangeKeyBH, nil /* transform */, nil /* readaheadHandle */, stats) } func checkChecksum( @@ -2981,12 +2745,12 @@ func checkChecksum( func (r *Reader) readBlock( bh BlockHandle, transform blockTransform, - raState *readaheadState, + readaheadHandle objstorage.ReadaheadHandle, stats *base.InternalIteratorStats, ) (_ cache.Handle, _ error) { if h := r.opts.Cache.Get(r.cacheID, r.fileNum, bh.Offset); h.Get() != nil { - if raState != nil { - raState.recordCacheHit(int64(bh.Offset), int64(bh.Length+blockTrailerLen)) + if readaheadHandle != nil { + readaheadHandle.RecordCacheHit(int64(bh.Offset), int64(bh.Length+blockTrailerLen)) } if stats != nil { stats.BlockBytes += bh.Length @@ -2994,41 +2758,16 @@ func (r *Reader) readBlock( } return h, nil } - file := r.file - - if raState != nil { - if raState.sequentialFile != nil { - file = raState.sequentialFile - } else if readaheadSize := raState.maybeReadahead(int64(bh.Offset), int64(bh.Length+blockTrailerLen)); readaheadSize > 0 { - if readaheadSize >= maxReadaheadSize { - // We've reached the maximum readahead size. Beyond this - // point, rely on OS-level readahead. Note that we can only - // reopen a new file handle with this optimization if - // r.fs != nil. This reader must have been created with the - // FileReopenOpt for this field to be set. - if r.fs != nil { - f, err := r.fs.Open(r.filename, vfs.SequentialReadsOption) - if err == nil { - // Use this new file handle for all sequential reads by - // this iterator going forward. - raState.sequentialFile = f - file = f - } - } - } - if raState.sequentialFile == nil { - if f, ok := r.file.(vfs.File); ok { - if fd := f.Fd(); fd != vfs.InvalidFd { - _ = vfs.Prefetch(fd, bh.Offset, uint64(readaheadSize)) - } - } - } - } - } v := r.opts.Cache.Alloc(int(bh.Length + blockTrailerLen)) b := v.Buf() - if _, err := file.ReadAt(b, int64(bh.Offset)); err != nil { + var err error + if readaheadHandle != nil { + _, err = readaheadHandle.ReadAt(b, int64(bh.Offset)) + } else { + _, err = r.readable.ReadAt(b, int64(bh.Offset)) + } + if err != nil { r.opts.Cache.Free(v) return cache.Handle{}, err } @@ -3119,7 +2858,7 @@ func (r *Reader) transformRangeDelV1(b []byte) ([]byte, error) { } func (r *Reader) readMetaindex(metaindexBH BlockHandle) error { - b, err := r.readBlock(metaindexBH, nil /* transform */, nil /* readaheadState */, nil /* stats */) + b, err := r.readBlock(metaindexBH, nil /* transform */, nil /* readaheadHandle */, nil /* stats */) if err != nil { return err } @@ -3161,7 +2900,7 @@ func (r *Reader) readMetaindex(metaindexBH BlockHandle) error { } if bh, ok := meta[metaPropertiesName]; ok { - b, err = r.readBlock(bh, nil /* transform */, nil /* readaheadState */, nil /* stats */) + b, err = r.readBlock(bh, nil /* transform */, nil /* readaheadHandle */, nil /* stats */) if err != nil { return err } @@ -3267,7 +3006,7 @@ func (r *Reader) Layout() (*Layout, error) { l.Index = append(l.Index, indexBH.BlockHandle) subIndex, err := r.readBlock( - indexBH.BlockHandle, nil /* transform */, nil /* readaheadState */, nil /* stats */) + indexBH.BlockHandle, nil /* transform */, nil /* readaheadHandle */, nil /* stats */) if err != nil { return nil, err } @@ -3350,9 +3089,9 @@ func (r *Reader) ValidateBlockChecksums() error { // Check all blocks sequentially. Make use of read-ahead, given we are // scanning the entire file from start to end. - blockRS := &readaheadState{ - size: initialReadaheadSize, - } + rh := r.readable.NewReadaheadHandle() + defer rh.Close() + for _, bh := range blocks { // Certain blocks may not be present, in which case we skip them. if bh.Length == 0 { @@ -3360,7 +3099,7 @@ func (r *Reader) ValidateBlockChecksums() error { } // Read the block, which validates the checksum. - h, err := r.readBlock(bh, nil /* transform */, blockRS, nil /* stats */) + h, err := r.readBlock(bh, nil /* transform */, rh, nil /* stats */) if err != nil { return err } @@ -3423,7 +3162,7 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) { return 0, errCorruptIndexEntry } startIdxBlock, err := r.readBlock( - startIdxBH.BlockHandle, nil /* transform */, nil /* readaheadState */, nil /* stats */) + startIdxBH.BlockHandle, nil /* transform */, nil /* readaheadHandle */, nil /* stats */) if err != nil { return 0, err } @@ -3444,7 +3183,7 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) { return 0, errCorruptIndexEntry } endIdxBlock, err := r.readBlock( - endIdxBH.BlockHandle, nil /* transform */, nil /* readaheadState */, nil /* stats */) + endIdxBH.BlockHandle, nil /* transform */, nil /* readaheadHandle */, nil /* stats */) if err != nil { return 0, err } @@ -3510,20 +3249,13 @@ func (r *Reader) TableFormat() (TableFormat, error) { return r.tableFormat, nil } -// ReadableFile describes subset of vfs.File required for reading SSTs. -type ReadableFile interface { - io.ReaderAt - io.Closer - Stat() (os.FileInfo, error) -} - // NewReader returns a new table reader for the file. Closing the reader will // close the file. -func NewReader(f ReadableFile, o ReaderOptions, extraOpts ...ReaderOption) (*Reader, error) { +func NewReader(f objstorage.Readable, o ReaderOptions, extraOpts ...ReaderOption) (*Reader, error) { o = o.ensureDefaults() r := &Reader{ - file: f, - opts: o, + readable: f, + opts: o, } if r.opts.Cache == nil { r.opts.Cache = cache.New(0) @@ -3685,7 +3417,7 @@ func (l *Layout) Describe( if b.name == "footer" || b.name == "leveldb-footer" { trailer, offset := make([]byte, b.Length), b.Offset - _, _ = r.file.ReadAt(trailer, int64(offset)) + _, _ = r.readable.ReadAt(trailer, int64(offset)) if b.name == "footer" { checksumType := ChecksumType(trailer[0]) @@ -3725,7 +3457,7 @@ func (l *Layout) Describe( continue } - h, err := r.readBlock(b.BlockHandle, nil /* transform */, nil /* readaheadState */, nil /* stats */) + h, err := r.readBlock(b.BlockHandle, nil /* transform */, nil /* readaheadHandle */, nil /* stats */) if err != nil { fmt.Fprintf(w, " [err: %s]\n", err) continue @@ -3757,7 +3489,7 @@ func (l *Layout) Describe( formatTrailer := func() { trailer := make([]byte, blockTrailerLen) offset := int64(b.Offset + b.Length) - _, _ = r.file.ReadAt(trailer, offset) + _, _ = r.readable.ReadAt(trailer, offset) bt := blockType(trailer[0]) checksum := binary.LittleEndian.Uint32(trailer[1:]) fmt.Fprintf(w, "%10d [trailer compression=%s checksum=0x%04x]\n", offset, bt, checksum) @@ -3885,3 +3617,44 @@ func (l *Layout) Describe( last := blocks[len(blocks)-1] fmt.Fprintf(w, "%10d EOF\n", last.Offset+last.Length) } + +// ReadableFile describes the smallest subset of objstorage.Readable that is +// required for reading SSTs. +type ReadableFile interface { + io.ReaderAt + io.Closer + Stat() (os.FileInfo, error) +} + +// NewSimpleReadable wraps a ReadableFile in a objstorage.Readable +// implementation (which does not support read-ahead) +func NewSimpleReadable(r ReadableFile) (objstorage.Readable, error) { + info, err := r.Stat() + if err != nil { + return nil, err + } + return &simpleReadable{ + ReadableFile: r, + size: info.Size(), + rh: objstorage.MakeNoopReadaheadHandle(r), + }, nil +} + +// simpleReadable wraps a ReadableFile to implement objstorage.Readable. +type simpleReadable struct { + ReadableFile + size int64 + rh objstorage.NoopReadaheadHandle +} + +var _ objstorage.Readable = (*simpleReadable)(nil) + +// Size is part of the objstorage.Readable interface. +func (s *simpleReadable) Size() int64 { + return s.size +} + +// NewReadaheadHandle is part of the objstorage.Readable interface. +func (s *simpleReadable) NewReadaheadHandle() objstorage.ReadaheadHandle { + return &s.rh +} diff --git a/sstable/reader_test.go b/sstable/reader_test.go index 018c195256..32ce338be3 100644 --- a/sstable/reader_test.go +++ b/sstable/reader_test.go @@ -13,7 +13,6 @@ import ( "os" "path" "path/filepath" - "strconv" "strings" "testing" "time" @@ -26,6 +25,7 @@ import ( "github.com/cockroachdb/pebble/internal/errorfs" "github.com/cockroachdb/pebble/internal/humanize" "github.com/cockroachdb/pebble/internal/testkeys" + "github.com/cockroachdb/pebble/objstorage" "github.com/cockroachdb/pebble/vfs" "github.com/stretchr/testify/require" "golang.org/x/exp/rand" @@ -247,7 +247,7 @@ func TestHamletReader(t *testing.T) { f, err := os.Open(filepath.FromSlash(prebuiltSST)) require.NoError(t, err) - r, err := NewReader(f, ReaderOptions{}) + r, err := newReader(f, ReaderOptions{}) require.NoError(t, err) t.Run( @@ -310,7 +310,8 @@ func TestInjectedErrors(t *testing.T) { run := func(i int) (reterr error) { f, err := os.Open(filepath.FromSlash(prebuiltSST)) require.NoError(t, err) - r, err := NewReader(errorfs.WrapFile(f, errorfs.OnIndex(int32(i))), ReaderOptions{}) + + r, err := newReader(errorfs.WrapFile(f, errorfs.OnIndex(int32(i))), ReaderOptions{}) if err != nil { return firstError(err, f.Close()) } @@ -357,15 +358,19 @@ func TestInjectedErrors(t *testing.T) { } func TestInvalidReader(t *testing.T) { + invalid, err := NewSimpleReadable(vfs.NewMemFile([]byte("invalid sst bytes"))) + if err != nil { + t.Fatal(err) + } testCases := []struct { - file vfs.File + readable objstorage.Readable expected string }{ {nil, "nil file"}, - {vfs.NewMemFile([]byte("invalid sst bytes")), "invalid table"}, + {invalid, "invalid table"}, } for _, tc := range testCases { - r, err := NewReader(tc.file, ReaderOptions{}) + r, err := NewReader(tc.readable, ReaderOptions{}) if !strings.Contains(err.Error(), tc.expected) { t.Fatalf("expected %q, but found %q", tc.expected, err.Error()) } @@ -574,7 +579,8 @@ func TestReaderCheckComparerMerger(t *testing.T) { for _, merger := range c.mergers { mergers[merger.Name] = merger } - r, err := NewReader(f1, ReaderOptions{}, comparers, mergers) + + r, err := newReader(f1, ReaderOptions{}, comparers, mergers) if err != nil { if r != nil { t.Fatalf("found non-nil reader returned with non-nil error %q", err.Error()) @@ -651,19 +657,21 @@ func TestBytesIterated(t *testing.T) { } func TestCompactionIteratorSetupForCompaction(t *testing.T) { + tmpDir := path.Join(t.TempDir()) + provider := objstorage.New(objstorage.DefaultSettings(vfs.Default, tmpDir)) blockSizes := []int{10, 100, 1000, 4096, math.MaxInt32} for _, blockSize := range blockSizes { for _, indexBlockSize := range blockSizes { for _, numEntries := range []uint64{0, 1, 1e5} { - r := buildTestTable(t, numEntries, blockSize, indexBlockSize, DefaultCompression) + r := buildTestTableWithProvider(t, provider, numEntries, blockSize, indexBlockSize, DefaultCompression) var bytesIterated uint64 citer, err := r.NewCompactionIter(&bytesIterated, TrivialReaderProvider{Reader: r}) require.NoError(t, err) switch i := citer.(type) { case *compactionIterator: - require.NotNil(t, i.dataRS.sequentialFile) + require.True(t, objstorage.TestingCheckMaxReadahead(i.dataRH)) case *twoLevelCompactionIterator: - require.NotNil(t, i.dataRS.sequentialFile) + require.True(t, objstorage.TestingCheckMaxReadahead(i.dataRH)) default: require.Failf(t, fmt.Sprintf("unknown compaction iterator type: %T", citer), "") } @@ -674,50 +682,6 @@ func TestCompactionIteratorSetupForCompaction(t *testing.T) { } } -func TestMaybeReadahead(t *testing.T) { - var rs readaheadState - datadriven.RunTest(t, "testdata/readahead", func(t *testing.T, d *datadriven.TestData) string { - cacheHit := false - switch d.Cmd { - case "reset": - rs.size = initialReadaheadSize - rs.limit = 0 - rs.numReads = 0 - return "" - - case "cache-read": - cacheHit = true - fallthrough - case "read": - args := strings.Split(d.Input, ",") - if len(args) != 2 { - return "expected 2 args: offset, size" - } - - offset, err := strconv.ParseInt(strings.TrimSpace(args[0]), 10, 64) - require.NoError(t, err) - size, err := strconv.ParseInt(strings.TrimSpace(args[1]), 10, 64) - require.NoError(t, err) - var raSize int64 - if cacheHit { - rs.recordCacheHit(offset, size) - } else { - raSize = rs.maybeReadahead(offset, size) - } - - var buf strings.Builder - fmt.Fprintf(&buf, "readahead: %d\n", raSize) - fmt.Fprintf(&buf, "numReads: %d\n", rs.numReads) - fmt.Fprintf(&buf, "size: %d\n", rs.size) - fmt.Fprintf(&buf, "prevSize: %d\n", rs.prevSize) - fmt.Fprintf(&buf, "limit: %d", rs.limit) - return buf.String() - default: - return fmt.Sprintf("unknown command: %s", d.Cmd) - } - }) -} - func TestReaderChecksumErrors(t *testing.T) { for _, checksumType := range []ChecksumType{ChecksumTypeCRC32c, ChecksumTypeXXHash64} { t.Run(fmt.Sprintf("checksum-type=%d", checksumType), func(t *testing.T) { @@ -753,7 +717,7 @@ func TestReaderChecksumErrors(t *testing.T) { f, err := mem.Open("test") require.NoError(t, err) - r, err := NewReader(f, ReaderOptions{}) + r, err := newReader(f, ReaderOptions{}) require.NoError(t, err) layout, err = r.Layout() require.NoError(t, err) @@ -784,7 +748,7 @@ func TestReaderChecksumErrors(t *testing.T) { corrupted, err = mem.Open("corrupted") require.NoError(t, err) - r, err := NewReader(corrupted, ReaderOptions{}) + r, err := newReader(corrupted, ReaderOptions{}) require.NoError(t, err) iter, err := r.NewIter(nil, nil) @@ -923,7 +887,7 @@ func TestValidateBlockChecksums(t *testing.T) { require.NoError(t, f.Close()) filter := bloom.FilterPolicy(10) - r, err := NewReader(fCopy, ReaderOptions{ + r, err := newReader(fCopy, ReaderOptions{ Filters: map[string]FilterPolicy{ filter.Name(): filter, }, @@ -1017,7 +981,7 @@ func TestReader_TableFormat(t *testing.T) { f, err = fs.Open("test") require.NoError(t, err) - r, err := NewReader(f, ReaderOptions{}) + r, err := newReader(f, ReaderOptions{}) require.NoError(t, err) defer r.Close() @@ -1036,8 +1000,18 @@ func TestReader_TableFormat(t *testing.T) { func buildTestTable( t *testing.T, numEntries uint64, blockSize, indexBlockSize int, compression Compression, ) *Reader { - mem := vfs.NewMem() - f0, err := mem.Create("test") + provider := objstorage.New(objstorage.DefaultSettings(vfs.NewMem(), "" /* dirName */)) + return buildTestTableWithProvider(t, provider, numEntries, blockSize, indexBlockSize, compression) +} + +func buildTestTableWithProvider( + t *testing.T, + provider *objstorage.Provider, + numEntries uint64, + blockSize, indexBlockSize int, + compression Compression, +) *Reader { + f0, err := provider.Create(base.FileTypeTable, 0 /* fileNum */) require.NoError(t, err) w := NewWriter(f0, WriterOptions{ @@ -1059,16 +1033,13 @@ func buildTestTable( require.NoError(t, w.Close()) // Re-open that filename for reading. - f1, err := mem.Open("test") + f1, err := provider.OpenForReading(base.FileTypeTable, 0 /* fileNum */) require.NoError(t, err) c := cache.New(128 << 20) defer c.Unref() r, err := NewReader(f1, ReaderOptions{ Cache: c, - }, FileReopenOpt{ - FS: mem, - Filename: "test", }) require.NoError(t, err) return r @@ -1106,7 +1077,7 @@ func buildBenchmarkTable( } c := cache.New(128 << 20) defer c.Unref() - r, err := NewReader(f1, ReaderOptions{ + r, err := newReader(f1, ReaderOptions{ Cache: c, }) if err != nil { @@ -1386,7 +1357,7 @@ func BenchmarkIteratorScanManyVersions(b *testing.B) { // Re-open the filename for reading. f0, err = mem.Open("bench") require.NoError(b, err) - r, err := NewReader(f0, ReaderOptions{ + r, err := newReader(f0, ReaderOptions{ Cache: c, Comparer: testkeys.Comparer, }) @@ -1479,7 +1450,7 @@ func BenchmarkIteratorScanNextPrefix(b *testing.B) { // Re-open the filename for reading. f0, err = mem.Open("bench") require.NoError(b, err) - r, err = NewReader(f0, ReaderOptions{ + r, err = newReader(f0, ReaderOptions{ Cache: c, Comparer: testkeys.Comparer, }) @@ -1576,3 +1547,11 @@ func BenchmarkIteratorScanNextPrefix(b *testing.B) { }) } } + +func newReader(r ReadableFile, o ReaderOptions, extraOpts ...ReaderOption) (*Reader, error) { + readable, err := NewSimpleReadable(r) + if err != nil { + return nil, err + } + return NewReader(readable, o, extraOpts...) +} diff --git a/sstable/suffix_rewriter.go b/sstable/suffix_rewriter.go index 6d854d29aa..9eb360da0a 100644 --- a/sstable/suffix_rewriter.go +++ b/sstable/suffix_rewriter.go @@ -3,15 +3,14 @@ package sstable import ( "bytes" "math" - "os" "sync" - "time" "github.com/cespare/xxhash/v2" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/bytealloc" "github.com/cockroachdb/pebble/internal/rangekey" + "github.com/cockroachdb/pebble/objstorage" ) // RewriteKeySuffixes copies the content of the passed SSTable bytes to a new @@ -31,7 +30,7 @@ import ( func RewriteKeySuffixes( sst []byte, rOpts ReaderOptions, - out writeCloseSyncer, + out objstorage.Writable, o WriterOptions, from, to []byte, concurrency int, @@ -45,7 +44,7 @@ func RewriteKeySuffixes( } func rewriteKeySuffixesInBlocks( - r *Reader, out writeCloseSyncer, o WriterOptions, from, to []byte, concurrency int, + r *Reader, out objstorage.Writable, o WriterOptions, from, to []byte, concurrency int, ) (*WriterMetadata, error) { if o.Comparer == nil || o.Comparer.Split == nil { return nil, errors.New("a valid splitter is required to define suffix to replace replace suffix") @@ -283,7 +282,7 @@ func rewriteDataBlocksToWriter( for i := range blocks { // Write the rewritten block to the file. - n, err := w.writer.Write(blocks[i].data) + n, err := w.writable.Write(blocks[i].data) if err != nil { return err } @@ -389,7 +388,7 @@ func (c copyFilterWriter) policyName() string { return c.origPolicyName } // more work to rederive filters, props, etc, however re-doing that work makes // it less restrictive -- props no longer need to func RewriteKeySuffixesViaWriter( - r *Reader, out writeCloseSyncer, o WriterOptions, from, to []byte, + r *Reader, out objstorage.Writable, o WriterOptions, from, to []byte, ) (*WriterMetadata, error) { if o.Comparer == nil || o.Comparer.Split == nil { return nil, errors.New("a valid splitter is required to define suffix to replace replace suffix") @@ -444,11 +443,11 @@ func RewriteKeySuffixesViaWriter( // NewMemReader opens a reader over the SST stored in the passed []byte. func NewMemReader(sst []byte, o ReaderOptions) (*Reader, error) { - return NewReader(memReader{sst, bytes.NewReader(sst), sizeOnlyStat(int64(len(sst)))}, o) + return NewReader(newMemReader(sst), o) } func readBlockBuf(r *Reader, bh BlockHandle, buf []byte) ([]byte, []byte, error) { - raw := r.file.(memReader).b[bh.Offset : bh.Offset+bh.Length+blockTrailerLen] + raw := r.readable.(*memReader).b[bh.Offset : bh.Offset+bh.Length+blockTrailerLen] if err := checkChecksum(r.checksumType, raw, bh, 0); err != nil { return nil, buf, err } @@ -472,27 +471,38 @@ func readBlockBuf(r *Reader, bh BlockHandle, buf []byte) ([]byte, []byte, error) // sstable.Reader. It supports concurrent use, and does so without locking in // contrast to the heavier read/write vfs.MemFile. type memReader struct { - b []byte - r *bytes.Reader - s sizeOnlyStat + b []byte + r *bytes.Reader + rh objstorage.NoopReadaheadHandle } -var _ ReadableFile = memReader{} +var _ objstorage.Readable = (*memReader)(nil) + +func newMemReader(b []byte) *memReader { + r := &memReader{ + b: b, + r: bytes.NewReader(b), + } + r.rh = objstorage.MakeNoopReadaheadHandle(r) + return r +} // ReadAt implements io.ReaderAt. -func (m memReader) ReadAt(p []byte, off int64) (n int, err error) { return m.r.ReadAt(p, off) } +func (m *memReader) ReadAt(p []byte, off int64) (n int, err error) { + return m.r.ReadAt(p, off) +} // Close implements io.Closer. -func (memReader) Close() error { return nil } - -// Stat implements ReadableFile. -func (m memReader) Stat() (os.FileInfo, error) { return m.s, nil } +func (*memReader) Close() error { + return nil +} -type sizeOnlyStat int64 +// Stat implements objstorage.Readable. +func (m *memReader) Size() int64 { + return int64(len(m.b)) +} -func (s sizeOnlyStat) Size() int64 { return int64(s) } -func (sizeOnlyStat) IsDir() bool { panic(errors.AssertionFailedf("unimplemented")) } -func (sizeOnlyStat) ModTime() time.Time { panic(errors.AssertionFailedf("unimplemented")) } -func (sizeOnlyStat) Mode() os.FileMode { panic(errors.AssertionFailedf("unimplemented")) } -func (sizeOnlyStat) Name() string { panic(errors.AssertionFailedf("unimplemented")) } -func (sizeOnlyStat) Sys() interface{} { panic(errors.AssertionFailedf("unimplemented")) } +// NewReadaheadHandle implements objstorage.Readable. +func (m *memReader) NewReadaheadHandle() objstorage.ReadaheadHandle { + return &m.rh +} diff --git a/sstable/suffix_rewriter_test.go b/sstable/suffix_rewriter_test.go index 1e8ac6764b..f809442f8f 100644 --- a/sstable/suffix_rewriter_test.go +++ b/sstable/suffix_rewriter_test.go @@ -10,6 +10,7 @@ import ( "github.com/cockroachdb/pebble/bloom" "github.com/cockroachdb/pebble/internal/base" + "github.com/cockroachdb/pebble/objstorage" "github.com/stretchr/testify/require" ) @@ -119,17 +120,19 @@ func TestRewriteSuffixProps(t *testing.T) { } // memFile is a file-like struct that buffers all data written to it in memory. -// Implements the writeCloseSyncer interface. +// Implements the objstorage.Writable interface. type memFile struct { bytes.Buffer } -// Close implements the writeCloseSyncer interface. +var _ objstorage.Writable = (*memFile)(nil) + +// Close implements the objstorage.Writable interface. func (*memFile) Close() error { return nil } -// Sync implements the writeCloseSyncer interface. +// Sync implements the objstorage.Writable interface. func (*memFile) Sync() error { return nil } @@ -216,8 +219,7 @@ func BenchmarkRewriteSST(b *testing.B) { r := files[comp][sz] b.Run(fmt.Sprintf("keys=%d", sizes[sz]), func(b *testing.B) { b.Run("ReaderWriterLoop", func(b *testing.B) { - stat, _ := r.file.Stat() - b.SetBytes(stat.Size()) + b.SetBytes(r.readable.Size()) for i := 0; i < b.N; i++ { if _, err := RewriteKeySuffixesViaWriter(r, &discardFile{}, writerOpts, from, to); err != nil { b.Fatal(err) @@ -226,8 +228,7 @@ func BenchmarkRewriteSST(b *testing.B) { }) for _, concurrency := range []int{1, 2, 4, 8, 16} { b.Run(fmt.Sprintf("RewriteKeySuffixes,concurrency=%d", concurrency), func(b *testing.B) { - stat, _ := r.file.Stat() - b.SetBytes(stat.Size()) + b.SetBytes(r.readable.Size()) for i := 0; i < b.N; i++ { if _, err := rewriteKeySuffixesInBlocks(r, &discardFile{}, writerOpts, []byte("_123"), []byte("_456"), concurrency); err != nil { b.Fatal(err) diff --git a/sstable/table.go b/sstable/table.go index 8fe3c76ee8..92e3d6e323 100644 --- a/sstable/table.go +++ b/sstable/table.go @@ -73,6 +73,7 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" + "github.com/cockroachdb/pebble/objstorage" ) /* @@ -304,18 +305,15 @@ type footer struct { footerBH BlockHandle } -func readFooter(f ReadableFile) (footer, error) { +func readFooter(f objstorage.Readable) (footer, error) { var footer footer - stat, err := f.Stat() - if err != nil { - return footer, errors.Wrap(err, "pebble/table: invalid table (could not stat file)") - } - if stat.Size() < minFooterLen { + size := f.Size() + if size < minFooterLen { return footer, base.CorruptionErrorf("pebble/table: invalid table (file size is too small)") } buf := make([]byte, maxFooterLen) - off := stat.Size() - maxFooterLen + off := size - maxFooterLen if off < 0 { off = 0 } @@ -369,7 +367,7 @@ func readFooter(f ReadableFile) (footer, error) { } { - end := uint64(stat.Size()) + end := uint64(size) var n int footer.metaindexBH, n = decodeBlockHandle(buf) if n == 0 || footer.metaindexBH.Offset+footer.metaindexBH.Length > end { diff --git a/sstable/table_test.go b/sstable/table_test.go index 1c5b4afe3c..92d5cd67de 100644 --- a/sstable/table_test.go +++ b/sstable/table_test.go @@ -125,7 +125,7 @@ func check(f vfs.File, comparer *Comparer, fp FilterPolicy) error { } } - r, err := NewReader(f, opts) + r, err := newReader(f, opts) if err != nil { return err } @@ -499,7 +499,7 @@ func TestBloomFilterFalsePositiveRate(t *testing.T) { c := &countingFilterPolicy{ FilterPolicy: bloom.FilterPolicy(1), } - r, err := NewReader(f, ReaderOptions{ + r, err := newReader(f, ReaderOptions{ Filters: map[string]FilterPolicy{ c.Name(): c, }, @@ -633,7 +633,7 @@ func TestFinalBlockIsWritten(t *testing.T) { t.Errorf("nk=%d, vLen=%d: memFS open: %v", nk, vLen, err) continue } - r, err := NewReader(rf, ReaderOptions{}) + r, err := newReader(rf, ReaderOptions{}) if err != nil { t.Errorf("nk=%d, vLen=%d: reader open: %v", nk, vLen, err) } @@ -666,7 +666,7 @@ func TestReaderGlobalSeqNum(t *testing.T) { f, err := os.Open(filepath.FromSlash("testdata/h.sst")) require.NoError(t, err) - r, err := NewReader(f, ReaderOptions{}) + r, err := newReader(f, ReaderOptions{}) require.NoError(t, err) const globalSeqNum = 42 @@ -689,7 +689,7 @@ func TestMetaIndexEntriesSorted(t *testing.T) { TableFilter, nil, nil, 4096, 4096) require.NoError(t, err) - r, err := NewReader(f, ReaderOptions{}) + r, err := newReader(f, ReaderOptions{}) require.NoError(t, err) b, err := r.readBlock(r.metaIndexBH, nil /* transform */, nil /* attrs */, nil /* stats */) @@ -747,9 +747,12 @@ func TestFooterRoundTrip(t *testing.T) { f, err = mem.Open("test") require.NoError(t, err) - result, err := readFooter(f) + readable, err := NewSimpleReadable(f) require.NoError(t, err) - require.NoError(t, f.Close()) + + result, err := readFooter(readable) + require.NoError(t, err) + require.NoError(t, readable.Close()) if diff := pretty.Diff(footer, result); diff != nil { t.Fatalf("expected %+v, but found %+v\n%s", @@ -797,7 +800,10 @@ func TestReadFooter(t *testing.T) { f, err = mem.Open("test") require.NoError(t, err) - if _, err := readFooter(f); err == nil { + readable, err := NewSimpleReadable(f) + require.NoError(t, err) + + if _, err := readFooter(readable); err == nil { t.Fatalf("expected %q, but found success", c.expected) } else if !strings.Contains(err.Error(), c.expected) { t.Fatalf("expected %q, but found %v", c.expected, err) diff --git a/sstable/writer.go b/sstable/writer.go index b04067b81e..6ce07ac2b5 100644 --- a/sstable/writer.go +++ b/sstable/writer.go @@ -5,11 +5,9 @@ package sstable import ( - "bufio" "bytes" "encoding/binary" "fmt" - "io" "math" "runtime" "sync" @@ -24,6 +22,7 @@ import ( "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/private" "github.com/cockroachdb/pebble/internal/rangekey" + "github.com/cockroachdb/pebble/objstorage" ) // encodedBHPEstimatedSize estimates the size of the encoded BlockHandleWithProperties. @@ -110,22 +109,11 @@ func (m *WriterMetadata) updateSeqNum(seqNum uint64) { } } -type flusher interface { - Flush() error -} - -type writeCloseSyncer interface { - io.WriteCloser - Sync() error -} - // Writer is a table writer. type Writer struct { - writer io.Writer - bufWriter *bufio.Writer - syncer writeCloseSyncer - meta WriterMetadata - err error + writable objstorage.Writable + meta WriterMetadata + err error // cacheID and fileNum are used to remove blocks written to the sstable from // the cache, providing a defense in depth against bugs which cause cache // collisions. @@ -1667,12 +1655,12 @@ func (w *Writer) writeCompressedBlock(block []byte, blockTrailerBuf []byte) (Blo } // Write the bytes to the file. - n, err := w.writer.Write(block) + n, err := w.writable.Write(block) if err != nil { return BlockHandle{}, err } w.meta.Size += uint64(n) - n, err = w.writer.Write(blockTrailerBuf[:blockTrailerLen]) + n, err = w.writable.Write(blockTrailerBuf[:blockTrailerLen]) if err != nil { return BlockHandle{}, err } @@ -1695,7 +1683,7 @@ func (w *Writer) Write(blockWithTrailer []byte) (n int, err error) { w.cache.Delete(w.cacheID, w.fileNum, offset) } w.meta.Size += uint64(len(blockWithTrailer)) - return w.writer.Write(blockWithTrailer) + return w.writable.Write(blockWithTrailer) } func (w *Writer) writeBlock( @@ -1743,14 +1731,14 @@ func (w *Writer) Close() (err error) { // the same object to a sync.Pool. w.valueBlockWriter = nil } - if w.syncer == nil { + if w.writable == nil { return } - err1 := w.syncer.Close() + err1 := w.writable.Close() if err == nil { err = err1 } - w.syncer = nil + w.writable = nil }() // finish must be called before we check for an error, because finish will @@ -1998,21 +1986,13 @@ func (w *Writer) Close() (err error) { indexBH: indexBH, } var n int - if n, err = w.writer.Write(footer.encode(w.blockBuf.tmp[:])); err != nil { + if n, err = w.writable.Write(footer.encode(w.blockBuf.tmp[:])); err != nil { w.err = err return w.err } w.meta.Size += uint64(n) w.meta.Properties = w.props - // Flush the buffer. - if w.bufWriter != nil { - if err := w.bufWriter.Flush(); err != nil { - w.err = err - return err - } - } - // Check that the features present in the table are compatible with the format // configured for the table. if err = w.assertFormatCompatibility(); err != nil { @@ -2020,7 +2000,7 @@ func (w *Writer) Close() (err error) { return w.err } - if err := w.syncer.Sync(); err != nil { + if err := w.writable.Sync(); err != nil { w.err = err return err } @@ -2051,7 +2031,7 @@ func (w *Writer) EstimatedSize() uint64 { // Metadata returns the metadata for the finished sstable. Only valid to call // after the sstable has been finished. func (w *Writer) Metadata() (*WriterMetadata, error) { - if w.syncer != nil { + if w.writable != nil { return nil, errors.New("pebble: writer is not closed") } return &w.meta, nil @@ -2106,10 +2086,10 @@ func (i internalTableOpt) writerApply(w *Writer) { // NewWriter returns a new table writer for the file. Closing the writer will // close the file. -func NewWriter(f writeCloseSyncer, o WriterOptions, extraOpts ...WriterOption) *Writer { +func NewWriter(writable objstorage.Writable, o WriterOptions, extraOpts ...WriterOption) *Writer { o = o.ensureDefaults() w := &Writer{ - syncer: f, + writable: writable, meta: WriterMetadata{ SmallestSeqNum: math.MaxUint64, }, @@ -2159,8 +2139,8 @@ func NewWriter(f writeCloseSyncer, o WriterOptions, extraOpts ...WriterOption) * w.coordination.init(o.Parallelism, w) - if f == nil { - w.err = errors.New("pebble: nil file") + if writable == nil { + w.err = errors.New("pebble: nil writable") return w } @@ -2242,14 +2222,6 @@ func NewWriter(f writeCloseSyncer, o WriterOptions, extraOpts ...WriterOption) * // Initialize the range key fragmenter and encoder. w.fragmenter.Emit = w.coalesceSpans w.rangeKeyEncoder.Emit = w.addRangeKey - - // If f does not have a Flush method, do our own buffering. - if _, ok := f.(flusher); ok { - w.writer = f - } else { - w.bufWriter = bufio.NewWriter(f) - w.writer = w.bufWriter - } return w } diff --git a/sstable/writer_rangekey_test.go b/sstable/writer_rangekey_test.go index f80fd583d2..210a78f93e 100644 --- a/sstable/writer_rangekey_test.go +++ b/sstable/writer_rangekey_test.go @@ -82,7 +82,7 @@ func TestWriter_RangeKeys(t *testing.T) { return nil, err } - r, err = NewReader(f, ReaderOptions{Comparer: cmp}) + r, err = newReader(f, ReaderOptions{Comparer: cmp}) if err != nil { return nil, err } diff --git a/sstable/writer_test.go b/sstable/writer_test.go index 371da72c8d..38ba0d9552 100644 --- a/sstable/writer_test.go +++ b/sstable/writer_test.go @@ -580,7 +580,7 @@ func TestWriterClearCache(t *testing.T) { f, err := mem.Open("test") require.NoError(t, err) - r, err := NewReader(f, opts) + r, err := newReader(f, opts) require.NoError(t, err) layout, err := r.Layout() diff --git a/table_cache.go b/table_cache.go index 292f2bd862..fe11830f3d 100644 --- a/table_cache.go +++ b/table_cache.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/manifest" "github.com/cockroachdb/pebble/internal/private" + "github.com/cockroachdb/pebble/objstorage" "github.com/cockroachdb/pebble/sstable" "github.com/cockroachdb/pebble/vfs" ) @@ -922,14 +923,13 @@ type tableCacheValue struct { } func (v *tableCacheValue) load(meta *fileMetadata, c *tableCacheShard, dbOpts *tableCacheOpts) { - // Try opening the fileTypeTable first. - var f vfs.File - v.filename = base.MakeFilepath(dbOpts.fs, dbOpts.dirname, fileTypeTable, meta.FileNum) - f, v.err = dbOpts.fs.Open(v.filename, vfs.RandomReadsOption) + // Try opening the file first. + provider := objstorage.New(objstorage.DefaultSettings(dbOpts.fs, dbOpts.dirname)) + var f objstorage.Readable + f, v.err = provider.OpenForReading(fileTypeTable, meta.FileNum) if v.err == nil { cacheOpts := private.SSTableCacheOpts(dbOpts.cacheID, meta.FileNum).(sstable.ReaderOption) - reopenOpt := sstable.FileReopenOpt{FS: dbOpts.fs, Filename: v.filename} - v.reader, v.err = sstable.NewReader(f, dbOpts.opts, cacheOpts, dbOpts.filterMetrics, reopenOpt) + v.reader, v.err = sstable.NewReader(f, dbOpts.opts, cacheOpts, dbOpts.filterMetrics) } if v.err == nil { if meta.SmallestSeqNum == meta.LargestSeqNum { diff --git a/table_stats_test.go b/table_stats_test.go index 898ddd946d..8fa1e6c4ae 100644 --- a/table_stats_test.go +++ b/table_stats_test.go @@ -201,7 +201,11 @@ func TestTableRangeDeletionIter(t *testing.T) { return err.Error() } var r *sstable.Reader - r, err = sstable.NewReader(f, sstable.ReaderOptions{}) + readable, err := sstable.NewSimpleReadable(f) + if err != nil { + return err.Error() + } + r, err = sstable.NewReader(readable, sstable.ReaderOptions{}) if err != nil { return err.Error() } diff --git a/testdata/event_listener b/testdata/event_listener index 24911f9f17..071f3740ee 100644 --- a/testdata/event_listener +++ b/testdata/event_listener @@ -204,7 +204,7 @@ compact 1 2.3 K 0 B 0 (size == estimated-debt, scor zmemtbl 0 0 B ztbl 0 0 B bcache 8 1.4 K 11.1% (score == hit-rate) - tcache 1 712 B 50.0% (score == hit-rate) + tcache 1 680 B 50.0% (score == hit-rate) snaps 0 - 0 (score == earliest seq num) titers 0 filter - - 0.0% (score == utility) diff --git a/testdata/ingest b/testdata/ingest index 8d9d4a9ebb..4a0b7a4fc4 100644 --- a/testdata/ingest +++ b/testdata/ingest @@ -3,7 +3,7 @@ ingest ingest non-existent ---- -stat non-existent: file does not exist +open non-existent: file does not exist # Elide ingestion of empty sstables. @@ -48,7 +48,7 @@ compact 0 0 B 0 B 0 (size == estimated-debt, scor zmemtbl 0 0 B ztbl 0 0 B bcache 8 1.5 K 42.9% (score == hit-rate) - tcache 1 712 B 50.0% (score == hit-rate) + tcache 1 680 B 50.0% (score == hit-rate) snaps 0 - 0 (score == earliest seq num) titers 0 filter - - 0.0% (score == utility) diff --git a/testdata/metrics b/testdata/metrics index 512999b648..0ff6260c8b 100644 --- a/testdata/metrics +++ b/testdata/metrics @@ -34,7 +34,7 @@ compact 0 0 B 0 B 0 (size == estimated-debt, scor zmemtbl 1 256 K ztbl 0 0 B bcache 4 697 B 0.0% (score == hit-rate) - tcache 1 712 B 0.0% (score == hit-rate) + tcache 1 680 B 0.0% (score == hit-rate) snaps 0 - 0 (score == earliest seq num) titers 1 filter - - 0.0% (score == utility) @@ -82,7 +82,7 @@ compact 1 0 B 0 B 0 (size == estimated-debt, scor zmemtbl 2 512 K ztbl 2 1.5 K bcache 8 1.4 K 42.9% (score == hit-rate) - tcache 2 1.4 K 66.7% (score == hit-rate) + tcache 2 1.3 K 66.7% (score == hit-rate) snaps 0 - 0 (score == earliest seq num) titers 2 filter - - 0.0% (score == utility) @@ -115,7 +115,7 @@ compact 1 0 B 0 B 0 (size == estimated-debt, scor zmemtbl 1 256 K ztbl 2 1.5 K bcache 8 1.4 K 42.9% (score == hit-rate) - tcache 2 1.4 K 66.7% (score == hit-rate) + tcache 2 1.3 K 66.7% (score == hit-rate) snaps 0 - 0 (score == earliest seq num) titers 2 filter - - 0.0% (score == utility) @@ -145,7 +145,7 @@ compact 1 0 B 0 B 0 (size == estimated-debt, scor zmemtbl 1 256 K ztbl 1 770 B bcache 4 697 B 42.9% (score == hit-rate) - tcache 1 712 B 66.7% (score == hit-rate) + tcache 1 680 B 66.7% (score == hit-rate) snaps 0 - 0 (score == earliest seq num) titers 1 filter - - 0.0% (score == utility) diff --git a/tool/db.go b/tool/db.go index d9893ec6cd..963e310a30 100644 --- a/tool/db.go +++ b/tool/db.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/humanize" "github.com/cockroachdb/pebble/internal/manifest" + "github.com/cockroachdb/pebble/objstorage" "github.com/cockroachdb/pebble/record" "github.com/cockroachdb/pebble/sstable" "github.com/cockroachdb/pebble/tool/logs" @@ -640,8 +641,8 @@ func (p *props) update(o props) { } func (d *dbT) addProps(dir string, m *manifest.FileMetadata, p *props) error { - path := base.MakeFilepath(d.opts.FS, dir, base.FileTypeTable, m.FileNum) - f, err := d.opts.FS.Open(path) + backend := objstorage.New(objstorage.DefaultSettings(d.opts.FS, dir)) + f, err := backend.OpenForReading(base.FileTypeTable, m.FileNum) if err != nil { return err } diff --git a/tool/find.go b/tool/find.go index c85b7e6ad1..c7c3bde25d 100644 --- a/tool/find.go +++ b/tool/find.go @@ -413,7 +413,11 @@ func (f *findT) searchTables(stdout io.Writer, searchKey []byte, refs []findRef) Comparer: f.opts.Comparer, Filters: f.opts.Filters, } - r, err := sstable.NewReader(tf, opts, f.comparers, f.mergers, + readable, err := sstable.NewSimpleReadable(tf) + if err != nil { + return err + } + r, err := sstable.NewReader(readable, opts, f.comparers, f.mergers, private.SSTableRawTombstonesOpt.(sstable.ReaderOption)) if err != nil { return err diff --git a/tool/sstable.go b/tool/sstable.go index 4a7d910ce4..1890a23b01 100644 --- a/tool/sstable.go +++ b/tool/sstable.go @@ -141,13 +141,17 @@ inclusive-inclusive range specified by --start and --end. } func (s *sstableT) newReader(f vfs.File) (*sstable.Reader, error) { + readable, err := sstable.NewSimpleReadable(f) + if err != nil { + return nil, err + } o := sstable.ReaderOptions{ Cache: pebble.NewCache(128 << 20 /* 128 MB */), Comparer: s.opts.Comparer, Filters: s.opts.Filters, } defer o.Cache.Unref() - return sstable.NewReader(f, o, s.comparers, s.mergers, + return sstable.NewReader(readable, o, s.comparers, s.mergers, private.SSTableRawTombstonesOpt.(sstable.ReaderOption)) }