Skip to content

Commit

Permalink
Merge pull request #2262 from RaduBerinde/clean-up-fd
Browse files Browse the repository at this point in the history
vfs: clean up Fd functionality
  • Loading branch information
RaduBerinde authored Jan 23, 2023
2 parents 1303cb0 + 0d4ce49 commit b418e86
Show file tree
Hide file tree
Showing 12 changed files with 68 additions and 94 deletions.
4 changes: 4 additions & 0 deletions internal/errorfs/errorfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,3 +369,7 @@ func (f *errorFile) Sync() error {
}
return f.file.Sync()
}

func (f *errorFile) Fd() uintptr {
return f.file.Fd()
}
9 changes: 4 additions & 5 deletions sstable/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3017,11 +3017,10 @@ func (r *Reader) readBlock(
}
}
if raState.sequentialFile == nil {
type fd interface {
Fd() uintptr
}
if f, ok := r.file.(fd); ok {
_ = vfs.Prefetch(f.Fd(), bh.Offset, uint64(readaheadSize))
if f, ok := r.file.(vfs.File); ok {
if fd := f.Fd(); fd != vfs.InvalidFd {
_ = vfs.Prefetch(fd, bh.Offset, uint64(readaheadSize))
}
}
}
}
Expand Down
35 changes: 20 additions & 15 deletions vfs/disk_full.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,10 @@ func (fs *enospcFS) Create(name string) (File, error) {
f, err = fs.inner.Create(name)
}
if f != nil {
f = WithFd(f, enospcFile{
f = &enospcFile{
fs: fs,
inner: f,
})
}
}
return f, err
}
Expand All @@ -222,21 +222,21 @@ func (fs *enospcFS) Link(oldname, newname string) error {
func (fs *enospcFS) Open(name string, opts ...OpenOption) (File, error) {
f, err := fs.inner.Open(name, opts...)
if f != nil {
f = WithFd(f, enospcFile{
f = &enospcFile{
fs: fs,
inner: f,
})
}
}
return f, err
}

func (fs *enospcFS) OpenDir(name string) (File, error) {
f, err := fs.inner.OpenDir(name)
if f != nil {
f = WithFd(f, enospcFile{
f = &enospcFile{
fs: fs,
inner: f,
})
}
}
return f, err
}
Expand Down Expand Up @@ -288,10 +288,10 @@ func (fs *enospcFS) ReuseForWrite(oldname, newname string) (File, error) {
}

if f != nil {
f = WithFd(f, enospcFile{
f = &enospcFile{
fs: fs,
inner: f,
})
}
}
return f, err
}
Expand Down Expand Up @@ -349,19 +349,21 @@ type enospcFile struct {
inner File
}

func (f enospcFile) Close() error {
var _ File = (*enospcFile)(nil)

func (f *enospcFile) Close() error {
return f.inner.Close()
}

func (f enospcFile) Read(p []byte) (n int, err error) {
func (f *enospcFile) Read(p []byte) (n int, err error) {
return f.inner.Read(p)
}

func (f enospcFile) ReadAt(p []byte, off int64) (n int, err error) {
func (f *enospcFile) ReadAt(p []byte, off int64) (n int, err error) {
return f.inner.ReadAt(p, off)
}

func (f enospcFile) Write(p []byte) (n int, err error) {
func (f *enospcFile) Write(p []byte) (n int, err error) {
gen := f.fs.waitUntilReady()

n, err = f.inner.Write(p)
Expand All @@ -375,11 +377,11 @@ func (f enospcFile) Write(p []byte) (n int, err error) {
return n, err
}

func (f enospcFile) Stat() (os.FileInfo, error) {
func (f *enospcFile) Stat() (os.FileInfo, error) {
return f.inner.Stat()
}

func (f enospcFile) Sync() error {
func (f *enospcFile) Sync() error {
gen := f.fs.waitUntilReady()

err := f.inner.Sync()
Expand All @@ -399,7 +401,10 @@ func (f enospcFile) Sync() error {
return err
}

// Ensure that *enospcFS implements the FS interface.
func (f *enospcFile) Fd() uintptr {
return f.inner.Fd()
}

var _ FS = (*enospcFS)(nil)

func isENOSPC(err error) bool {
Expand Down
6 changes: 3 additions & 3 deletions vfs/disk_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func (d *diskHealthCheckingFS) Create(name string) (File, error) {
d.onSlowDisk(name, duration)
})
checkingFile.startTicker()
return WithFd(f, checkingFile), nil
return checkingFile, nil
}

// GetDiskUsage implements the FS interface.
Expand Down Expand Up @@ -420,7 +420,7 @@ func (d *diskHealthCheckingFS) OpenDir(name string) (File, error) {
d.onSlowDisk(name, duration)
})
checkingFile.startTicker()
return WithFd(f, checkingFile), nil
return checkingFile, nil
}

// PathBase implements the FS interface.
Expand Down Expand Up @@ -482,7 +482,7 @@ func (d *diskHealthCheckingFS) ReuseForWrite(oldname, newname string) (File, err
d.onSlowDisk(newname, duration)
})
checkingFile.startTicker()
return WithFd(f, checkingFile), nil
return checkingFile, nil
}

// Stat implements the FS interface.
Expand Down
4 changes: 4 additions & 0 deletions vfs/disk_health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ func (m mockFile) Stat() (os.FileInfo, error) {
panic("unimplemented")
}

func (m mockFile) Fd() uintptr {
return InvalidFd
}

func (m mockFile) Sync() error {
time.Sleep(m.syncDuration)
return nil
Expand Down
42 changes: 0 additions & 42 deletions vfs/fd.go

This file was deleted.

10 changes: 4 additions & 6 deletions vfs/fd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,11 @@ func TestFileWrappersHaveFd(t *testing.T) {
defer closer.Close()
f2, err := fs2.Open(filename)
require.NoError(t, err)
if _, ok := f2.(fdGetter); !ok {
t.Fatal("expected diskHealthCheckingFile to export Fd() method")
}
require.NotZero(t, f2.Fd())
require.NotEqual(t, f2.Fd(), InvalidFd)
// File wrapper case 2: Check if syncingFile has Fd().
f3 := NewSyncingFile(f2, SyncingFileOptions{BytesPerSync: 8 << 10 /* 8 KB */})
if _, ok := f3.(fdGetter); !ok {
t.Fatal("expected syncingFile to export Fd() method")
}
require.NotZero(t, f3.Fd())
require.NotEqual(t, f3.Fd(), InvalidFd)
require.NoError(t, f2.Close())
}
6 changes: 6 additions & 0 deletions vfs/mem_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,8 @@ type memFile struct {
read, write bool
}

var _ File = (*memFile)(nil)

func (f *memFile) Close() error {
if n := atomic.AddInt32(&f.n.refs, -1); n < 0 {
panic(fmt.Sprintf("pebble: close of unopened file: %d", n))
Expand Down Expand Up @@ -721,6 +723,10 @@ func (f *memFile) Sync() error {
return nil
}

func (f *memFile) Fd() uintptr {
return InvalidFd
}

// Flush is a no-op and present only to prevent buffering at higher levels
// (e.g. it prevents sstable.Writer from using a bufio.Writer).
func (f *memFile) Flush() error {
Expand Down
14 changes: 5 additions & 9 deletions vfs/syncing_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type SyncingFileOptions struct {

type syncingFile struct {
File
// fd can be InvalidFd if the underlying File does not support it.
fd uintptr
useSyncRange bool
closing bool
Expand Down Expand Up @@ -49,6 +50,7 @@ type syncingFile struct {
func NewSyncingFile(f File, opts SyncingFileOptions) File {
s := &syncingFile{
File: f,
fd: f.Fd(),
noSyncOnClose: bool(opts.NoSyncOnClose),
bytesPerSync: int64(opts.BytesPerSync),
preallocateSize: int64(opts.PreallocateSize),
Expand All @@ -57,12 +59,6 @@ func NewSyncingFile(f File, opts SyncingFileOptions) File {
// data has been written to it.
s.atomic.syncOffset = -1

type fd interface {
Fd() uintptr
}
if d, ok := f.(fd); ok {
s.fd = d.Fd()
}
type dhChecker interface {
timeDiskOp(op func())
}
Expand All @@ -79,7 +75,7 @@ func NewSyncingFile(f File, opts SyncingFileOptions) File {
if s.syncData == nil {
s.syncData = s.File.Sync
}
return WithFd(f, s)
return s
}

// NB: syncingFile.Write is unsafe for concurrent use!
Expand All @@ -100,7 +96,7 @@ func (f *syncingFile) Write(p []byte) (n int, err error) {
}

func (f *syncingFile) preallocate(offset int64) error {
if f.fd == 0 || f.preallocateSize == 0 {
if f.fd == InvalidFd || f.preallocateSize == 0 {
return nil
}

Expand Down Expand Up @@ -163,7 +159,7 @@ func (f *syncingFile) maybeSync() error {
return nil
}

if f.fd == 0 {
if f.fd == InvalidFd {
return errors.WithStack(f.Sync())
}

Expand Down
4 changes: 2 additions & 2 deletions vfs/syncing_file_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func isSyncRangeSupported(fd uintptr) bool {
}

func (f *syncingFile) init() {
if f.fd == 0 {
if f.fd == InvalidFd {
return
}
f.timeDiskOp(func() {
Expand All @@ -55,7 +55,7 @@ func (f *syncingFile) init() {
}

func (f *syncingFile) syncFdatasync() error {
if f.fd == 0 {
if f.fd == InvalidFd {
return f.File.Sync()
}
var err error
Expand Down
3 changes: 1 addition & 2 deletions vfs/syncing_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ func TestSyncingFile(t *testing.T) {
t.Fatalf("failed to wrap: %p != %p", f, s)
}
s = NewSyncingFile(f, SyncingFileOptions{BytesPerSync: 8 << 10 /* 8 KB */})
s = s.(*fdFileWrapper).File
s.(*syncingFile).fd = 1
s.(*syncingFile).syncTo = func(offset int64) error {
s.(*syncingFile).ratchetSyncOffset(offset)
Expand Down Expand Up @@ -108,7 +107,7 @@ close: test [<nil>]
return nil
}
} else {
s.fd = 0
s.fd = InvalidFd
}

write := func(n int64) {
Expand Down
25 changes: 15 additions & 10 deletions vfs/vfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,19 @@ type File interface {
io.Writer
Stat() (os.FileInfo, error)
Sync() error

// Fd returns the raw file descriptor when a File is backed by an *os.File.
// It can be used for specific functionality like Prefetch.
// Returns InvalidFd if not supported.
Fd() uintptr
}

// InvalidFd is a special value returned by File.Fd() when the file is not
// backed by an OS descriptor.
// Note: the special value is consistent with what os.File implementation
// returns on a nil receiver.
const InvalidFd uintptr = ^(uintptr(0))

// OpenOption provide an interface to do work on file handles in the Open()
// call.
type OpenOption interface {
Expand Down Expand Up @@ -240,11 +251,8 @@ var RandomReadsOption OpenOption = &randomReadsOption{}

// Apply implements the OpenOption interface.
func (randomReadsOption) Apply(f File) {
type fd interface {
Fd() uintptr
}
if fdFile, ok := f.(fd); ok {
_ = fadviseRandom(fdFile.Fd())
if fd := f.Fd(); fd != InvalidFd {
_ = fadviseRandom(fd)
}
}

Expand All @@ -257,11 +265,8 @@ var SequentialReadsOption OpenOption = &sequentialReadsOption{}

// Apply implements the OpenOption interface.
func (sequentialReadsOption) Apply(f File) {
type fd interface {
Fd() uintptr
}
if fdFile, ok := f.(fd); ok {
_ = fadviseSequential(fdFile.Fd())
if fd := f.Fd(); fd != InvalidFd {
_ = fadviseSequential(fd)
}
}

Expand Down

0 comments on commit b418e86

Please sign in to comment.