From 0d4ce49e8e754289d13e8c77eed7ea87c66a57b4 Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Mon, 23 Jan 2023 13:59:01 -0800 Subject: [PATCH] vfs: clean up Fd functionality We currently sidestep the `vfs.File` interface to obtain the raw file descriptor when the `File` is backed by an underlying `*os.File`. This mechanism (where we cast to `interface{ Fd() }`) is fragile and requires extra "fd wrappers" to plumb the `Fd` method through disk full and disk health wrappers. Each fd wrapper introduces an extra virtual table indirection on every File interface call. This change cleans this up by making all `File`s implement `Fd()`; we allow returning InvalidFd when there is no raw file descriptor. It is up to the (very few) callers to check if the result is InvalidFd. --- internal/errorfs/errorfs.go | 4 ++++ sstable/reader.go | 9 ++++---- vfs/disk_full.go | 35 ++++++++++++++++++------------- vfs/disk_health.go | 6 +++--- vfs/disk_health_test.go | 4 ++++ vfs/fd.go | 42 ------------------------------------- vfs/fd_test.go | 10 ++++----- vfs/mem_fs.go | 6 ++++++ vfs/syncing_file.go | 14 +++++-------- vfs/syncing_file_linux.go | 4 ++-- vfs/syncing_file_test.go | 3 +-- vfs/vfs.go | 25 +++++++++++++--------- 12 files changed, 68 insertions(+), 94 deletions(-) delete mode 100644 vfs/fd.go diff --git a/internal/errorfs/errorfs.go b/internal/errorfs/errorfs.go index 63feda2ce7..e34282ec47 100644 --- a/internal/errorfs/errorfs.go +++ b/internal/errorfs/errorfs.go @@ -369,3 +369,7 @@ func (f *errorFile) Sync() error { } return f.file.Sync() } + +func (f *errorFile) Fd() uintptr { + return f.file.Fd() +} diff --git a/sstable/reader.go b/sstable/reader.go index 591a35d707..768d3f6f7e 100644 --- a/sstable/reader.go +++ b/sstable/reader.go @@ -3017,11 +3017,10 @@ func (r *Reader) readBlock( } } if raState.sequentialFile == nil { - type fd interface { - Fd() uintptr - } - if f, ok := r.file.(fd); ok { - _ = vfs.Prefetch(f.Fd(), bh.Offset, uint64(readaheadSize)) + if f, ok := r.file.(vfs.File); ok { + if fd := f.Fd(); fd != vfs.InvalidFd { + _ = vfs.Prefetch(fd, bh.Offset, uint64(readaheadSize)) + } } } } diff --git a/vfs/disk_full.go b/vfs/disk_full.go index c7d12ea88c..ce84b2dc76 100644 --- a/vfs/disk_full.go +++ b/vfs/disk_full.go @@ -199,10 +199,10 @@ func (fs *enospcFS) Create(name string) (File, error) { f, err = fs.inner.Create(name) } if f != nil { - f = WithFd(f, enospcFile{ + f = &enospcFile{ fs: fs, inner: f, - }) + } } return f, err } @@ -222,10 +222,10 @@ func (fs *enospcFS) Link(oldname, newname string) error { func (fs *enospcFS) Open(name string, opts ...OpenOption) (File, error) { f, err := fs.inner.Open(name, opts...) if f != nil { - f = WithFd(f, enospcFile{ + f = &enospcFile{ fs: fs, inner: f, - }) + } } return f, err } @@ -233,10 +233,10 @@ func (fs *enospcFS) Open(name string, opts ...OpenOption) (File, error) { func (fs *enospcFS) OpenDir(name string) (File, error) { f, err := fs.inner.OpenDir(name) if f != nil { - f = WithFd(f, enospcFile{ + f = &enospcFile{ fs: fs, inner: f, - }) + } } return f, err } @@ -288,10 +288,10 @@ func (fs *enospcFS) ReuseForWrite(oldname, newname string) (File, error) { } if f != nil { - f = WithFd(f, enospcFile{ + f = &enospcFile{ fs: fs, inner: f, - }) + } } return f, err } @@ -349,19 +349,21 @@ type enospcFile struct { inner File } -func (f enospcFile) Close() error { +var _ File = (*enospcFile)(nil) + +func (f *enospcFile) Close() error { return f.inner.Close() } -func (f enospcFile) Read(p []byte) (n int, err error) { +func (f *enospcFile) Read(p []byte) (n int, err error) { return f.inner.Read(p) } -func (f enospcFile) ReadAt(p []byte, off int64) (n int, err error) { +func (f *enospcFile) ReadAt(p []byte, off int64) (n int, err error) { return f.inner.ReadAt(p, off) } -func (f enospcFile) Write(p []byte) (n int, err error) { +func (f *enospcFile) Write(p []byte) (n int, err error) { gen := f.fs.waitUntilReady() n, err = f.inner.Write(p) @@ -375,11 +377,11 @@ func (f enospcFile) Write(p []byte) (n int, err error) { return n, err } -func (f enospcFile) Stat() (os.FileInfo, error) { +func (f *enospcFile) Stat() (os.FileInfo, error) { return f.inner.Stat() } -func (f enospcFile) Sync() error { +func (f *enospcFile) Sync() error { gen := f.fs.waitUntilReady() err := f.inner.Sync() @@ -399,7 +401,10 @@ func (f enospcFile) Sync() error { return err } -// Ensure that *enospcFS implements the FS interface. +func (f *enospcFile) Fd() uintptr { + return f.inner.Fd() +} + var _ FS = (*enospcFS)(nil) func isENOSPC(err error) bool { diff --git a/vfs/disk_health.go b/vfs/disk_health.go index ceb9046ee7..81688fb3a3 100644 --- a/vfs/disk_health.go +++ b/vfs/disk_health.go @@ -367,7 +367,7 @@ func (d *diskHealthCheckingFS) Create(name string) (File, error) { d.onSlowDisk(name, duration) }) checkingFile.startTicker() - return WithFd(f, checkingFile), nil + return checkingFile, nil } // GetDiskUsage implements the FS interface. @@ -420,7 +420,7 @@ func (d *diskHealthCheckingFS) OpenDir(name string) (File, error) { d.onSlowDisk(name, duration) }) checkingFile.startTicker() - return WithFd(f, checkingFile), nil + return checkingFile, nil } // PathBase implements the FS interface. @@ -482,7 +482,7 @@ func (d *diskHealthCheckingFS) ReuseForWrite(oldname, newname string) (File, err d.onSlowDisk(newname, duration) }) checkingFile.startTicker() - return WithFd(f, checkingFile), nil + return checkingFile, nil } // Stat implements the FS interface. diff --git a/vfs/disk_health_test.go b/vfs/disk_health_test.go index 3add516eb9..77bb990ad7 100644 --- a/vfs/disk_health_test.go +++ b/vfs/disk_health_test.go @@ -40,6 +40,10 @@ func (m mockFile) Stat() (os.FileInfo, error) { panic("unimplemented") } +func (m mockFile) Fd() uintptr { + return InvalidFd +} + func (m mockFile) Sync() error { time.Sleep(m.syncDuration) return nil diff --git a/vfs/fd.go b/vfs/fd.go deleted file mode 100644 index 199d3042e4..0000000000 --- a/vfs/fd.go +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright 2021 The LevelDB-Go and Pebble Authors. All rights reserved. Use -// of this source code is governed by a BSD-style license that can be found in -// the LICENSE file. - -package vfs - -// fdGetter is an interface for a file with an Fd() method. A lot of -// File related optimizations (eg. Prefetch(), WAL recycling) rely on the -// existence of the Fd method to return a raw file descriptor. -type fdGetter interface { - Fd() uintptr -} - -// fdFileWrapper is a File wrapper that also exposes an Fd() method. Used to -// wrap outer (wrapped) Files that could unintentionally hide the Fd() method -// exposed by the inner (unwrapped) File. It effectively lets the Fd() method -// bypass the outer File and go to the inner File. -type fdFileWrapper struct { - File - - // All methods usually pass through to File above, except for Fd(), which - // bypasses it and gets called directly on the inner file. - inner fdGetter -} - -func (f *fdFileWrapper) Fd() uintptr { - return f.inner.Fd() -} - -// WithFd takes an inner (unwrapped) and an outer (wrapped) vfs.File, -// and returns an fdFileWrapper if the inner file has an Fd() method. Use this -// method to fix the hiding of the Fd() method and the subsequent unintentional -// disabling of Fd-related file optimizations. -func WithFd(inner, outer File) File { - if f, ok := inner.(fdGetter); ok { - return &fdFileWrapper{ - File: outer, - inner: f, - } - } - return outer -} diff --git a/vfs/fd_test.go b/vfs/fd_test.go index e0e9cd2d3e..02d9c5466d 100644 --- a/vfs/fd_test.go +++ b/vfs/fd_test.go @@ -26,13 +26,11 @@ func TestFileWrappersHaveFd(t *testing.T) { defer closer.Close() f2, err := fs2.Open(filename) require.NoError(t, err) - if _, ok := f2.(fdGetter); !ok { - t.Fatal("expected diskHealthCheckingFile to export Fd() method") - } + require.NotZero(t, f2.Fd()) + require.NotEqual(t, f2.Fd(), InvalidFd) // File wrapper case 2: Check if syncingFile has Fd(). f3 := NewSyncingFile(f2, SyncingFileOptions{BytesPerSync: 8 << 10 /* 8 KB */}) - if _, ok := f3.(fdGetter); !ok { - t.Fatal("expected syncingFile to export Fd() method") - } + require.NotZero(t, f3.Fd()) + require.NotEqual(t, f3.Fd(), InvalidFd) require.NoError(t, f2.Close()) } diff --git a/vfs/mem_fs.go b/vfs/mem_fs.go index e298f0ee52..4b2bb6f75c 100644 --- a/vfs/mem_fs.go +++ b/vfs/mem_fs.go @@ -626,6 +626,8 @@ type memFile struct { read, write bool } +var _ File = (*memFile)(nil) + func (f *memFile) Close() error { if n := atomic.AddInt32(&f.n.refs, -1); n < 0 { panic(fmt.Sprintf("pebble: close of unopened file: %d", n)) @@ -721,6 +723,10 @@ func (f *memFile) Sync() error { return nil } +func (f *memFile) Fd() uintptr { + return InvalidFd +} + // Flush is a no-op and present only to prevent buffering at higher levels // (e.g. it prevents sstable.Writer from using a bufio.Writer). func (f *memFile) Flush() error { diff --git a/vfs/syncing_file.go b/vfs/syncing_file.go index cbf7064618..6224f3a94e 100644 --- a/vfs/syncing_file.go +++ b/vfs/syncing_file.go @@ -19,6 +19,7 @@ type SyncingFileOptions struct { type syncingFile struct { File + // fd can be InvalidFd if the underlying File does not support it. fd uintptr useSyncRange bool closing bool @@ -49,6 +50,7 @@ type syncingFile struct { func NewSyncingFile(f File, opts SyncingFileOptions) File { s := &syncingFile{ File: f, + fd: f.Fd(), noSyncOnClose: bool(opts.NoSyncOnClose), bytesPerSync: int64(opts.BytesPerSync), preallocateSize: int64(opts.PreallocateSize), @@ -57,12 +59,6 @@ func NewSyncingFile(f File, opts SyncingFileOptions) File { // data has been written to it. s.atomic.syncOffset = -1 - type fd interface { - Fd() uintptr - } - if d, ok := f.(fd); ok { - s.fd = d.Fd() - } type dhChecker interface { timeDiskOp(op func()) } @@ -79,7 +75,7 @@ func NewSyncingFile(f File, opts SyncingFileOptions) File { if s.syncData == nil { s.syncData = s.File.Sync } - return WithFd(f, s) + return s } // NB: syncingFile.Write is unsafe for concurrent use! @@ -100,7 +96,7 @@ func (f *syncingFile) Write(p []byte) (n int, err error) { } func (f *syncingFile) preallocate(offset int64) error { - if f.fd == 0 || f.preallocateSize == 0 { + if f.fd == InvalidFd || f.preallocateSize == 0 { return nil } @@ -163,7 +159,7 @@ func (f *syncingFile) maybeSync() error { return nil } - if f.fd == 0 { + if f.fd == InvalidFd { return errors.WithStack(f.Sync()) } diff --git a/vfs/syncing_file_linux.go b/vfs/syncing_file_linux.go index c1378c307d..bdc008e481 100644 --- a/vfs/syncing_file_linux.go +++ b/vfs/syncing_file_linux.go @@ -40,7 +40,7 @@ func isSyncRangeSupported(fd uintptr) bool { } func (f *syncingFile) init() { - if f.fd == 0 { + if f.fd == InvalidFd { return } f.timeDiskOp(func() { @@ -55,7 +55,7 @@ func (f *syncingFile) init() { } func (f *syncingFile) syncFdatasync() error { - if f.fd == 0 { + if f.fd == InvalidFd { return f.File.Sync() } var err error diff --git a/vfs/syncing_file_test.go b/vfs/syncing_file_test.go index 5feb4dd4b6..0dfe19c7c7 100644 --- a/vfs/syncing_file_test.go +++ b/vfs/syncing_file_test.go @@ -32,7 +32,6 @@ func TestSyncingFile(t *testing.T) { t.Fatalf("failed to wrap: %p != %p", f, s) } s = NewSyncingFile(f, SyncingFileOptions{BytesPerSync: 8 << 10 /* 8 KB */}) - s = s.(*fdFileWrapper).File s.(*syncingFile).fd = 1 s.(*syncingFile).syncTo = func(offset int64) error { s.(*syncingFile).ratchetSyncOffset(offset) @@ -108,7 +107,7 @@ close: test [] return nil } } else { - s.fd = 0 + s.fd = InvalidFd } write := func(n int64) { diff --git a/vfs/vfs.go b/vfs/vfs.go index 5e0571b96e..9c983ac320 100644 --- a/vfs/vfs.go +++ b/vfs/vfs.go @@ -31,8 +31,19 @@ type File interface { io.Writer Stat() (os.FileInfo, error) Sync() error + + // Fd returns the raw file descriptor when a File is backed by an *os.File. + // It can be used for specific functionality like Prefetch. + // Returns InvalidFd if not supported. + Fd() uintptr } +// InvalidFd is a special value returned by File.Fd() when the file is not +// backed by an OS descriptor. +// Note: the special value is consistent with what os.File implementation +// returns on a nil receiver. +const InvalidFd uintptr = ^(uintptr(0)) + // OpenOption provide an interface to do work on file handles in the Open() // call. type OpenOption interface { @@ -240,11 +251,8 @@ var RandomReadsOption OpenOption = &randomReadsOption{} // Apply implements the OpenOption interface. func (randomReadsOption) Apply(f File) { - type fd interface { - Fd() uintptr - } - if fdFile, ok := f.(fd); ok { - _ = fadviseRandom(fdFile.Fd()) + if fd := f.Fd(); fd != InvalidFd { + _ = fadviseRandom(fd) } } @@ -257,11 +265,8 @@ var SequentialReadsOption OpenOption = &sequentialReadsOption{} // Apply implements the OpenOption interface. func (sequentialReadsOption) Apply(f File) { - type fd interface { - Fd() uintptr - } - if fdFile, ok := f.(fd); ok { - _ = fadviseSequential(fdFile.Fd()) + if fd := f.Fd(); fd != InvalidFd { + _ = fadviseSequential(fd) } }