diff --git a/event_listener_test.go b/event_listener_test.go index 72907bbeb1..faa8e50675 100644 --- a/event_listener_test.go +++ b/event_listener_test.go @@ -131,6 +131,16 @@ func (f loggingFile) Sync() error { return f.File.Sync() } +func (f loggingFile) SyncData() error { + fmt.Fprintf(f.w, "sync-data: %s\n", f.name) + return f.File.SyncData() +} + +func (f loggingFile) SyncTo(length int64) (fullSync bool, err error) { + fmt.Fprintf(f.w, "sync-to(%d): %s\n", length, f.name) + return f.File.SyncTo(length) +} + // Verify event listener actions, as well as expected filesystem operations. func TestEventListener(t *testing.T) { var d *DB diff --git a/internal/errorfs/errorfs.go b/internal/errorfs/errorfs.go index e34282ec47..3b5ae12d1a 100644 --- a/internal/errorfs/errorfs.go +++ b/internal/errorfs/errorfs.go @@ -47,6 +47,8 @@ const ( OpLock // OpList describes a list directory operation. OpList + // OpFilePreallocate describes a file preallocate operation. + OpFilePreallocate // OpStat describes a path-based stat operation. OpStat // OpGetDiskUsage describes a disk usage operation. @@ -72,7 +74,7 @@ func (o Op) OpKind() OpKind { switch o { case OpOpen, OpOpenDir, OpList, OpStat, OpGetDiskUsage, OpFileRead, OpFileReadAt, OpFileStat: return OpKindRead - case OpCreate, OpLink, OpRemove, OpRemoveAll, OpRename, OpReuseForRewrite, OpMkdirAll, OpLock, OpFileClose, OpFileWrite, OpFileSync, OpFileFlush: + case OpCreate, OpLink, OpRemove, OpRemoveAll, OpRename, OpReuseForRewrite, OpMkdirAll, OpLock, OpFileClose, OpFileWrite, OpFileSync, OpFileFlush, OpFilePreallocate: return OpKindWrite default: panic(fmt.Sprintf("unrecognized op %v\n", o)) @@ -363,6 +365,13 @@ func (f *errorFile) Stat() (os.FileInfo, error) { return f.file.Stat() } +func (f *errorFile) Preallocate(offset, length int64) error { + if err := f.inj.MaybeError(OpFilePreallocate, f.path); err != nil { + return err + } + return f.file.Preallocate(offset, length) +} + func (f *errorFile) Sync() error { if err := f.inj.MaybeError(OpFileSync, f.path); err != nil { return err @@ -370,6 +379,16 @@ func (f *errorFile) Sync() error { return f.file.Sync() } +func (f *errorFile) SyncData() error { + // TODO(jackson): Consider error injection. + return f.file.SyncData() +} + +func (f *errorFile) SyncTo(length int64) (fullSync bool, err error) { + // TODO(jackson): Consider error injection. + return f.file.SyncTo(length) +} + func (f *errorFile) Fd() uintptr { return f.file.Fd() } diff --git a/sstable/reader_test.go b/sstable/reader_test.go index eb1907955f..39ad52abc6 100644 --- a/sstable/reader_test.go +++ b/sstable/reader_test.go @@ -308,7 +308,7 @@ func TestInjectedErrors(t *testing.T) { for _, prebuiltSST := range prebuiltSSTs { run := func(i int) (reterr error) { - f, err := os.Open(filepath.FromSlash(prebuiltSST)) + f, err := vfs.Default.Open(filepath.FromSlash(prebuiltSST)) require.NoError(t, err) r, err := NewReader(errorfs.WrapFile(f, errorfs.OnIndex(int32(i))), ReaderOptions{}) if err != nil { diff --git a/sstable/table_test.go b/sstable/table_test.go index 1c5b4afe3c..e9c15b7d04 100644 --- a/sstable/table_test.go +++ b/sstable/table_test.go @@ -414,7 +414,7 @@ func build( func testReader(t *testing.T, filename string, comparer *Comparer, fp FilterPolicy) { // Check that we can read a pre-made table. - f, err := os.Open(filepath.FromSlash("testdata/" + filename)) + f, err := vfs.Default.Open(filepath.FromSlash("testdata/" + filename)) if err != nil { t.Error(err) return diff --git a/testdata/checkpoint b/testdata/checkpoint index 901ae85c9e..0bce607650 100644 --- a/testdata/checkpoint +++ b/testdata/checkpoint @@ -66,16 +66,16 @@ set a 1 set b 2 set c 3 ---- -sync: db/000002.log +sync-data: db/000002.log flush db ---- -sync: db/000002.log +sync-data: db/000002.log close: db/000002.log create: db/000004.log sync: db create: db/000005.sst -sync: db/000005.sst +sync-data: db/000005.sst close: db/000005.sst sync: db sync: db/MANIFEST-000001 @@ -85,16 +85,16 @@ set b 5 set d 7 set e 8 ---- -sync: db/000004.log +sync-data: db/000004.log flush db ---- -sync: db/000004.log +sync-data: db/000004.log close: db/000004.log reuseForWrite: db/000002.log -> db/000006.log sync: db create: db/000007.sst -sync: db/000007.sst +sync-data: db/000007.sst close: db/000007.sst sync: db sync: db/MANIFEST-000001 @@ -103,7 +103,7 @@ batch db set f 9 set g 10 ---- -sync: db/000006.log +sync-data: db/000006.log checkpoint db checkpoints/checkpoint1 ---- @@ -118,23 +118,23 @@ open-dir: checkpoints/checkpoint1 link: db/OPTIONS-000003 -> checkpoints/checkpoint1/OPTIONS-000003 open-dir: checkpoints/checkpoint1 create: checkpoints/checkpoint1/marker.format-version.000001.012 -sync: checkpoints/checkpoint1/marker.format-version.000001.012 +sync-data: checkpoints/checkpoint1/marker.format-version.000001.012 close: checkpoints/checkpoint1/marker.format-version.000001.012 sync: checkpoints/checkpoint1 close: checkpoints/checkpoint1 link: db/000005.sst -> checkpoints/checkpoint1/000005.sst link: db/000007.sst -> checkpoints/checkpoint1/000007.sst create: checkpoints/checkpoint1/MANIFEST-000001 -sync: checkpoints/checkpoint1/MANIFEST-000001 +sync-data: checkpoints/checkpoint1/MANIFEST-000001 close: checkpoints/checkpoint1/MANIFEST-000001 open-dir: checkpoints/checkpoint1 create: checkpoints/checkpoint1/marker.manifest.000001.MANIFEST-000001 -sync: checkpoints/checkpoint1/marker.manifest.000001.MANIFEST-000001 +sync-data: checkpoints/checkpoint1/marker.manifest.000001.MANIFEST-000001 close: checkpoints/checkpoint1/marker.manifest.000001.MANIFEST-000001 sync: checkpoints/checkpoint1 close: checkpoints/checkpoint1 create: checkpoints/checkpoint1/000006.log -sync: checkpoints/checkpoint1/000006.log +sync-data: checkpoints/checkpoint1/000006.log close: checkpoints/checkpoint1/000006.log sync: checkpoints/checkpoint1 close: checkpoints/checkpoint1 @@ -154,22 +154,22 @@ open-dir: checkpoints/checkpoint2 link: db/OPTIONS-000003 -> checkpoints/checkpoint2/OPTIONS-000003 open-dir: checkpoints/checkpoint2 create: checkpoints/checkpoint2/marker.format-version.000001.012 -sync: checkpoints/checkpoint2/marker.format-version.000001.012 +sync-data: checkpoints/checkpoint2/marker.format-version.000001.012 close: checkpoints/checkpoint2/marker.format-version.000001.012 sync: checkpoints/checkpoint2 close: checkpoints/checkpoint2 link: db/000007.sst -> checkpoints/checkpoint2/000007.sst create: checkpoints/checkpoint2/MANIFEST-000001 -sync: checkpoints/checkpoint2/MANIFEST-000001 +sync-data: checkpoints/checkpoint2/MANIFEST-000001 close: checkpoints/checkpoint2/MANIFEST-000001 open-dir: checkpoints/checkpoint2 create: checkpoints/checkpoint2/marker.manifest.000001.MANIFEST-000001 -sync: checkpoints/checkpoint2/marker.manifest.000001.MANIFEST-000001 +sync-data: checkpoints/checkpoint2/marker.manifest.000001.MANIFEST-000001 close: checkpoints/checkpoint2/marker.manifest.000001.MANIFEST-000001 sync: checkpoints/checkpoint2 close: checkpoints/checkpoint2 create: checkpoints/checkpoint2/000006.log -sync: checkpoints/checkpoint2/000006.log +sync-data: checkpoints/checkpoint2/000006.log close: checkpoints/checkpoint2/000006.log sync: checkpoints/checkpoint2 close: checkpoints/checkpoint2 @@ -185,40 +185,40 @@ open-dir: checkpoints/checkpoint3 link: db/OPTIONS-000003 -> checkpoints/checkpoint3/OPTIONS-000003 open-dir: checkpoints/checkpoint3 create: checkpoints/checkpoint3/marker.format-version.000001.012 -sync: checkpoints/checkpoint3/marker.format-version.000001.012 +sync-data: checkpoints/checkpoint3/marker.format-version.000001.012 close: checkpoints/checkpoint3/marker.format-version.000001.012 sync: checkpoints/checkpoint3 close: checkpoints/checkpoint3 link: db/000005.sst -> checkpoints/checkpoint3/000005.sst link: db/000007.sst -> checkpoints/checkpoint3/000007.sst create: checkpoints/checkpoint3/MANIFEST-000001 -sync: checkpoints/checkpoint3/MANIFEST-000001 +sync-data: checkpoints/checkpoint3/MANIFEST-000001 close: checkpoints/checkpoint3/MANIFEST-000001 open-dir: checkpoints/checkpoint3 create: checkpoints/checkpoint3/marker.manifest.000001.MANIFEST-000001 -sync: checkpoints/checkpoint3/marker.manifest.000001.MANIFEST-000001 +sync-data: checkpoints/checkpoint3/marker.manifest.000001.MANIFEST-000001 close: checkpoints/checkpoint3/marker.manifest.000001.MANIFEST-000001 sync: checkpoints/checkpoint3 close: checkpoints/checkpoint3 create: checkpoints/checkpoint3/000006.log -sync: checkpoints/checkpoint3/000006.log +sync-data: checkpoints/checkpoint3/000006.log close: checkpoints/checkpoint3/000006.log sync: checkpoints/checkpoint3 close: checkpoints/checkpoint3 compact db ---- -sync: db/000006.log +sync-data: db/000006.log close: db/000006.log reuseForWrite: db/000004.log -> db/000008.log sync: db create: db/000009.sst -sync: db/000009.sst +sync-data: db/000009.sst close: db/000009.sst sync: db sync: db/MANIFEST-000001 create: db/000010.sst -sync: db/000010.sst +sync-data: db/000010.sst close: db/000010.sst sync: db sync: db/MANIFEST-000001 @@ -226,7 +226,7 @@ sync: db/MANIFEST-000001 batch db set h 11 ---- -sync: db/000008.log +sync-data: db/000008.log list db ---- diff --git a/testdata/cleaner b/testdata/cleaner index 4f6259d4dc..9c8f9ac289 100644 --- a/testdata/cleaner +++ b/testdata/cleaner @@ -28,16 +28,16 @@ set a 1 set b 2 set c 3 ---- -sync: wal/000002.log +sync-data: wal/000002.log flush db ---- -sync: wal/000002.log +sync-data: wal/000002.log close: wal/000002.log create: wal/000004.log sync: wal create: db/000005.sst -sync: db/000005.sst +sync-data: db/000005.sst close: db/000005.sst sync: db sync: db/MANIFEST-000001 @@ -47,23 +47,23 @@ rename: wal/000002.log -> wal/archive/000002.log batch db set d 4 ---- -sync: wal/000004.log +sync-data: wal/000004.log compact db ---- -sync: wal/000004.log +sync-data: wal/000004.log close: wal/000004.log create: wal/000006.log sync: wal create: db/000007.sst -sync: db/000007.sst +sync-data: db/000007.sst close: db/000007.sst sync: db sync: db/MANIFEST-000001 mkdir-all: wal/archive 0755 rename: wal/000004.log -> wal/archive/000004.log create: db/000008.sst -sync: db/000008.sst +sync-data: db/000008.sst close: db/000008.sst sync: db sync: db/MANIFEST-000001 diff --git a/testdata/event_listener b/testdata/event_listener index 24911f9f17..383bff05ca 100644 --- a/testdata/event_listener +++ b/testdata/event_listener @@ -78,8 +78,8 @@ sync: db flush ---- -sync: wal/000002.log -sync: wal/000002.log +sync-data: wal/000002.log +sync-data: wal/000002.log close: wal/000002.log create: wal/000004.log sync: wal @@ -87,7 +87,7 @@ sync: wal [JOB 5] flushing 1 memtable to L0 create: db/000005.sst [JOB 5] flushing: sstable created 000005 -sync: db/000005.sst +sync-data: db/000005.sst close: db/000005.sst sync: db create: db/MANIFEST-000006 @@ -101,8 +101,8 @@ sync: db compact ---- -sync: wal/000004.log -sync: wal/000004.log +sync-data: wal/000004.log +sync-data: wal/000004.log close: wal/000004.log reuseForWrite: wal/000002.log -> wal/000007.log sync: wal @@ -110,7 +110,7 @@ sync: wal [JOB 7] flushing 1 memtable to L0 create: db/000008.sst [JOB 7] flushing: sstable created 000008 -sync: db/000008.sst +sync-data: db/000008.sst close: db/000008.sst sync: db create: db/MANIFEST-000009 @@ -125,7 +125,7 @@ sync: db [JOB 8] compacting(default) L0 [000005 000008] (1.5 K) + L6 [] (0 B) create: db/000010.sst [JOB 8] compacting: sstable created 000010 -sync: db/000010.sst +sync-data: db/000010.sst close: db/000010.sst sync: db create: db/MANIFEST-000011 @@ -145,8 +145,8 @@ disable-file-deletions flush ---- -sync: wal/000007.log -sync: wal/000007.log +sync-data: wal/000007.log +sync-data: wal/000007.log close: wal/000007.log reuseForWrite: wal/000004.log -> wal/000012.log sync: wal @@ -154,7 +154,7 @@ sync: wal [JOB 10] flushing 1 memtable to L0 create: db/000013.sst [JOB 10] flushing: sstable created 000013 -sync: db/000013.sst +sync-data: db/000013.sst close: db/000013.sst sync: db create: db/MANIFEST-000014 @@ -227,7 +227,7 @@ open-dir: checkpoint link: db/OPTIONS-000003 -> checkpoint/OPTIONS-000003 open-dir: checkpoint create: checkpoint/marker.format-version.000001.012 -sync: checkpoint/marker.format-version.000001.012 +sync-data: checkpoint/marker.format-version.000001.012 close: checkpoint/marker.format-version.000001.012 sync: checkpoint close: checkpoint @@ -235,16 +235,16 @@ link: db/000013.sst -> checkpoint/000013.sst link: db/000015.sst -> checkpoint/000015.sst link: db/000010.sst -> checkpoint/000010.sst create: checkpoint/MANIFEST-000016 -sync: checkpoint/MANIFEST-000016 +sync-data: checkpoint/MANIFEST-000016 close: checkpoint/MANIFEST-000016 open-dir: checkpoint create: checkpoint/marker.manifest.000001.MANIFEST-000016 -sync: checkpoint/marker.manifest.000001.MANIFEST-000016 +sync-data: checkpoint/marker.manifest.000001.MANIFEST-000016 close: checkpoint/marker.manifest.000001.MANIFEST-000016 sync: checkpoint close: checkpoint create: checkpoint/000012.log -sync: checkpoint/000012.log +sync-data: checkpoint/000012.log close: checkpoint/000012.log sync: checkpoint close: checkpoint @@ -256,7 +256,7 @@ pebble: file deletion disablement invariant violated close ---- close: db -sync: wal/000012.log +sync-data: wal/000012.log close: wal/000012.log close: db/MANIFEST-000016 close: db diff --git a/vfs/default_linux.go b/vfs/default_linux.go new file mode 100644 index 0000000000..b8b959afb7 --- /dev/null +++ b/vfs/default_linux.go @@ -0,0 +1,126 @@ +// Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +//go:build linux && !arm +// +build linux,!arm + +package vfs + +import ( + "os" + "syscall" + + "github.com/cockroachdb/errors" + "golang.org/x/sys/unix" +) + +func wrapOSFileImpl(f *os.File) File { + lf := &linuxFile{File: f, fd: f.Fd()} + if lf.fd != InvalidFd { + lf.useSyncRange = isSyncRangeSupported(lf.fd) + } + return lf +} + +func (defaultFS) OpenDir(name string) (File, error) { + f, err := os.OpenFile(name, syscall.O_CLOEXEC, 0) + if err != nil { + return nil, errors.WithStack(err) + } + return &linuxDir{f}, nil +} + +// Assert that linuxFile and linuxDir implement vfs.File. +var ( + _ File = (*linuxDir)(nil) + _ File = (*linuxFile)(nil) +) + +type linuxDir struct { + *os.File +} + +func (d *linuxDir) Preallocate(offset, length int64) error { return nil } +func (d *linuxDir) SyncData() error { return d.Sync() } +func (d *linuxDir) SyncTo(offset int64) (fullSync bool, err error) { return false, nil } + +type linuxFile struct { + *os.File + fd uintptr + useSyncRange bool +} + +func (f *linuxFile) Preallocate(offset, length int64) error { + return preallocExtend(f.fd, offset, length) +} + +func (f *linuxFile) SyncData() error { + return unix.Fdatasync(int(f.fd)) +} + +func (f *linuxFile) SyncTo(offset int64) (fullSync bool, err error) { + if !f.useSyncRange { + // Use fdatasync, which does provide persistence guarantees but won't + // update all file metadata. From the `fdatasync` man page: + // + // fdatasync() is similar to fsync(), but does not flush modified + // metadata unless that metadata is needed in order to allow a + // subsequent data retrieval to be correctly handled. For example, + // changes to st_atime or st_mtime (respectively, time of last access + // and time of last modification; see stat(2)) do not require flushing + // because they are not necessary for a subsequent data read to be + // handled correctly. On the other hand, a change to the file size + // (st_size, as made by say ftruncate(2)), would require a metadata + // flush. + if err = unix.Fdatasync(int(f.fd)); err != nil { + return false, err + } + return true, nil + } + + const ( + waitBefore = 0x1 + write = 0x2 + // waitAfter = 0x4 + ) + + // By specifying write|waitBefore for the flags, we're instructing + // SyncFileRange to a) wait for any outstanding data being written to finish, + // and b) to queue any other dirty data blocks in the range [0,offset] for + // writing. The actual writing of this data will occur asynchronously. The + // use of `waitBefore` is to limit how much dirty data is allowed to + // accumulate. Linux sometimes behaves poorly when a large amount of dirty + // data accumulates, impacting other I/O operations. + return false, unix.SyncFileRange(int(f.fd), 0, offset, write|waitBefore) +} + +type syncFileRange func(fd int, off int64, n int64, flags int) (err error) + +// sync_file_range depends on both the filesystem, and the broader kernel +// support. In particular, Windows Subsystem for Linux does not support +// sync_file_range, even when used with ext{2,3,4}. syncRangeSmokeTest performs +// a test of of sync_file_range, returning false on ENOSYS, and true otherwise. +func syncRangeSmokeTest(fd uintptr, syncFn syncFileRange) bool { + err := syncFn(int(fd), 0 /* offset */, 0 /* nbytes */, 0 /* flags */) + return err != unix.ENOSYS +} + +func isSyncRangeSupported(fd uintptr) bool { + var stat unix.Statfs_t + if err := unix.Fstatfs(int(fd), &stat); err != nil { + return false + } + + // Allowlist which filesystems we allow using sync_file_range with as some + // filesystems treat that syscall as a noop (notably ZFS). A allowlist is + // used instead of a denylist in order to have a more graceful failure mode + // in case a filesystem we haven't tested is encountered. Currently only + // ext2/3/4 are known to work properly. + const extMagic = 0xef53 + switch stat.Type { + case extMagic: + return syncRangeSmokeTest(fd, unix.SyncFileRange) + } + return false +} diff --git a/vfs/default_unix.go b/vfs/default_unix.go new file mode 100644 index 0000000000..b3527a1dd8 --- /dev/null +++ b/vfs/default_unix.go @@ -0,0 +1,50 @@ +// Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +//go:build darwin || dragonfly || freebsd || (linux && arm) || netbsd || openbsd || solaris +// +build darwin dragonfly freebsd linux,arm netbsd openbsd solaris + +package vfs + +import ( + "os" + "syscall" + + "github.com/cockroachdb/errors" +) + +func wrapOSFileImpl(osFile *os.File) File { + return &unixFile{File: osFile, fd: osFile.Fd()} +} + +func (defaultFS) OpenDir(name string) (File, error) { + f, err := os.OpenFile(name, syscall.O_CLOEXEC, 0) + if err != nil { + return nil, errors.WithStack(err) + } + return &unixFile{f, InvalidFd}, nil +} + +// Assert that unixFile implements vfs.File. +var _ File = (*unixFile)(nil) + +type unixFile struct { + *os.File + fd uintptr +} + +func (f *unixFile) Preallocate(offset, length int64) error { + return preallocExtend(f.fd, offset, length) +} + +func (f *unixFile) SyncData() error { + return f.Sync() +} + +func (f *unixFile) SyncTo(int64) (fullSync bool, err error) { + if err = f.Sync(); err != nil { + return false, err + } + return true, nil +} diff --git a/vfs/default_windows.go b/vfs/default_windows.go new file mode 100644 index 0000000000..f283573de0 --- /dev/null +++ b/vfs/default_windows.go @@ -0,0 +1,64 @@ +// Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +//go:build windows +// +build windows + +package vfs + +import ( + "os" + "syscall" + + "github.com/cockroachdb/errors" +) + +func wrapOSFileImpl(f *os.File) File { + return &windowsFile{f} +} + +func (defaultFS) OpenDir(name string) (File, error) { + f, err := os.OpenFile(name, syscall.O_CLOEXEC, 0) + if err != nil { + return nil, errors.WithStack(err) + } + return &windowsDir{f}, nil +} + +// Assert that windowsFile and windowsDir implement vfs.File. +var ( + _ File = (*windowsFile)(nil) + _ File = (*windowsDir)(nil) +) + +type windowsDir struct { + *os.File +} + +func (*windowsDir) Preallocate(off, length int64) error { return nil } + +// Silently ignore Sync() on Windows. This is the same behavior as +// RocksDB. See port/win/io_win.cc:WinDirectory::Fsync(). +func (*windowsDir) Sync() error { return nil } +func (*windowsDir) SyncData() error { return nil } +func (*windowsDir) SyncTo(length int64) (fullSync bool, err error) { return false, nil } + +type windowsFile struct { + *os.File +} + +func (*windowsFile) Preallocate(offset, length int64) error { + // It is ok for correctness to no-op file preallocation. WAL recycling is the + // more important mechanism for WAL sync performance and it doesn't rely on + // fallocate or posix_fallocate in order to be effective. + return nil +} + +func (f *windowsFile) SyncData() error { return f.Sync() } +func (f *windowsFile) SyncTo(length int64) (fullSync bool, err error) { + if err = f.Sync(); err != nil { + return false, err + } + return true, nil +} diff --git a/vfs/dir_unix.go b/vfs/dir_unix.go deleted file mode 100644 index c27aba9504..0000000000 --- a/vfs/dir_unix.go +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright 2014 The LevelDB-Go and Pebble Authors. All rights reserved. Use -// of this source code is governed by a BSD-style license that can be found in -// the LICENSE file. - -//go:build darwin || dragonfly || freebsd || linux || netbsd || openbsd || solaris -// +build darwin dragonfly freebsd linux netbsd openbsd solaris - -package vfs - -import ( - "os" - "syscall" - - "github.com/cockroachdb/errors" -) - -func (defaultFS) OpenDir(name string) (File, error) { - f, err := os.OpenFile(name, syscall.O_CLOEXEC, 0) - return f, errors.WithStack(err) -} diff --git a/vfs/dir_windows.go b/vfs/dir_windows.go deleted file mode 100644 index d53690ac16..0000000000 --- a/vfs/dir_windows.go +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright 2014 The LevelDB-Go and Pebble Authors. All rights reserved. Use -// of this source code is governed by a BSD-style license that can be found in -// the LICENSE file. - -//go:build windows -// +build windows - -package vfs - -import ( - "os" - "syscall" - - "github.com/cockroachdb/errors" -) - -type windowsDir struct { - File -} - -func (windowsDir) Sync() error { - // Silently ignore Sync() on Windows. This is the same behavior as - // RocksDB. See port/win/io_win.cc:WinDirectory::Fsync(). - return nil -} - -func (defaultFS) OpenDir(name string) (File, error) { - f, err := os.OpenFile(name, syscall.O_CLOEXEC, 0) - if err != nil { - return nil, errors.WithStack(err) - } - return windowsDir{f}, nil -} diff --git a/vfs/disk_full.go b/vfs/disk_full.go index ce84b2dc76..6da89f314e 100644 --- a/vfs/disk_full.go +++ b/vfs/disk_full.go @@ -377,6 +377,10 @@ func (f *enospcFile) Write(p []byte) (n int, err error) { return n, err } +func (f *enospcFile) Preallocate(offset, length int64) error { + return f.inner.Preallocate(offset, length) +} + func (f *enospcFile) Stat() (os.FileInfo, error) { return f.inner.Stat() } @@ -401,6 +405,14 @@ func (f *enospcFile) Sync() error { return err } +func (f *enospcFile) SyncData() error { + return f.inner.SyncData() +} + +func (f *enospcFile) SyncTo(length int64) (fullSync bool, err error) { + return f.inner.SyncTo(length) +} + func (f *enospcFile) Fd() uintptr { return f.inner.Fd() } diff --git a/vfs/disk_health.go b/vfs/disk_health.go index 2f06effcc2..87fc9e1ecf 100644 --- a/vfs/disk_health.go +++ b/vfs/disk_health.go @@ -40,9 +40,12 @@ const ( OpTypeUnknown OpType = iota OpTypeWrite OpTypeSync + OpTypeSyncData + OpTypeSyncTo OpTypeCreate OpTypeLink OpTypeMkdirAll + OpTypePreallocate OpTypeRemove OpTypeRemoveAll OpTypeRename @@ -59,12 +62,18 @@ func (o OpType) String() string { return "write" case OpTypeSync: return "sync" + case OpTypeSyncData: + return "syncdata" + case OpTypeSyncTo: + return "syncto" case OpTypeCreate: return "create" case OpTypeLink: return "link" case OpTypeMkdirAll: return "mkdirall" + case OpTypePreallocate: + return "preallocate" case OpTypeRemove: return "remove" case OpTypeRemoveAll: @@ -72,7 +81,7 @@ func (o OpType) String() string { case OpTypeRename: return "rename" case OpTypeReuseForWrite: - return "reuseforwrtie" + return "reuseforwrite" case OpTypeUnknown: return "unknown" default: @@ -88,8 +97,7 @@ func (o OpType) String() string { // duration. This setup is preferable to creating a new timer at every disk // operation, as it reduces overhead per disk operation. type diskHealthCheckingFile struct { - File - + file File onSlowDisk func(OpType, time.Duration) diskSlowThreshold time.Duration tickInterval time.Duration @@ -115,7 +123,7 @@ func newDiskHealthCheckingFile( file File, diskSlowThreshold time.Duration, onSlowDisk func(OpType, time.Duration), ) *diskHealthCheckingFile { return &diskHealthCheckingFile{ - File: file, + file: file, onSlowDisk: onSlowDisk, diskSlowThreshold: diskSlowThreshold, tickInterval: defaultTickInterval, @@ -164,10 +172,25 @@ func (d *diskHealthCheckingFile) stopTicker() { close(d.stopper) } +// Fd implements (vfs.File).Fd. +func (d *diskHealthCheckingFile) Fd() uintptr { + return d.file.Fd() +} + +// Read implements (vfs.File).Read +func (d *diskHealthCheckingFile) Read(p []byte) (int, error) { + return d.file.Read(p) +} + +// ReadAt implements (vfs.File).ReadAt +func (d *diskHealthCheckingFile) ReadAt(p []byte, off int64) (int, error) { + return d.file.ReadAt(p, off) +} + // Write implements the io.Writer interface. func (d *diskHealthCheckingFile) Write(p []byte) (n int, err error) { d.timeDiskOp(OpTypeWrite, func() { - n, err = d.File.Write(p) + n, err = d.file.Write(p) }) return n, err } @@ -175,17 +198,46 @@ func (d *diskHealthCheckingFile) Write(p []byte) (n int, err error) { // Close implements the io.Closer interface. func (d *diskHealthCheckingFile) Close() error { d.stopTicker() - return d.File.Close() + return d.file.Close() +} + +// Preallocate implements (vfs.File).Preallocate. +func (d *diskHealthCheckingFile) Preallocate(off, n int64) (err error) { + d.timeDiskOp(OpTypePreallocate, func() { + err = d.file.Preallocate(off, n) + }) + return err +} + +// Stat implements (vfs.File).Stat. +func (d *diskHealthCheckingFile) Stat() (os.FileInfo, error) { + return d.file.Stat() } // Sync implements the io.Syncer interface. func (d *diskHealthCheckingFile) Sync() (err error) { d.timeDiskOp(OpTypeSync, func() { - err = d.File.Sync() + err = d.file.Sync() + }) + return err +} + +// SyncData implements (vfs.File).SyncData. +func (d *diskHealthCheckingFile) SyncData() (err error) { + d.timeDiskOp(OpTypeSyncData, func() { + err = d.file.SyncData() }) return err } +// SyncTo implements (vfs.File).SyncTo. +func (d *diskHealthCheckingFile) SyncTo(length int64) (fullSync bool, err error) { + d.timeDiskOp(OpTypeSyncTo, func() { + fullSync, err = d.file.SyncTo(length) + }) + return fullSync, err +} + // timeDiskOp runs the specified closure and makes its timing visible to the // monitoring goroutine, in case it exceeds one of the slow disk durations. func (d *diskHealthCheckingFile) timeDiskOp(opType OpType, op func()) { diff --git a/vfs/disk_health_test.go b/vfs/disk_health_test.go index dc4c76369b..2bdcc9634f 100644 --- a/vfs/disk_health_test.go +++ b/vfs/disk_health_test.go @@ -37,6 +37,11 @@ func (m mockFile) Write(p []byte) (n int, err error) { return len(p), nil } +func (m mockFile) Preallocate(int64, int64) error { + time.Sleep(m.syncAndWriteDuration) + return nil +} + func (m mockFile) Stat() (os.FileInfo, error) { panic("unimplemented") } @@ -50,6 +55,16 @@ func (m mockFile) Sync() error { return nil } +func (m mockFile) SyncData() error { + time.Sleep(m.syncAndWriteDuration) + return nil +} + +func (m mockFile) SyncTo(int64) (fullSync bool, err error) { + time.Sleep(m.syncAndWriteDuration) + return false, nil +} + var _ File = &mockFile{} type mockFS struct { @@ -187,8 +202,8 @@ var _ FS = &mockFS{} func TestDiskHealthChecking_File(t *testing.T) { const ( - slowThreshold = 1 * time.Second - syncDuration = 3 * time.Second + slowThreshold = 1 * time.Second + syncAndWriteDuration = 3 * time.Second ) testCases := []struct { op OpType @@ -211,7 +226,7 @@ func TestDiskHealthChecking_File(t *testing.T) { } diskSlow := make(chan info, 1) mockFS := &mockFS{create: func(name string) (File, error) { - return mockFile{syncAndWriteDuration: syncDuration}, nil + return mockFile{syncAndWriteDuration: syncAndWriteDuration}, nil }} fs, closer := WithDiskHealthChecks(mockFS, slowThreshold, func(s string, opType OpType, duration time.Duration) { diff --git a/vfs/mem_fs.go b/vfs/mem_fs.go index 4b2bb6f75c..d63886647f 100644 --- a/vfs/mem_fs.go +++ b/vfs/mem_fs.go @@ -698,6 +698,10 @@ func (f *memFile) Write(p []byte) (int, error) { return len(p), nil } +func (f *memFile) Preallocate(offset, length int64) error { + return nil +} + func (f *memFile) Stat() (os.FileInfo, error) { return f.n, nil } @@ -723,6 +727,18 @@ func (f *memFile) Sync() error { return nil } +func (f *memFile) SyncData() error { + return f.Sync() +} + +func (f *memFile) SyncTo(length int64) (fullSync bool, err error) { + // NB: This SyncTo implementation lies, with its return values claiming it + // synced the data up to `length`. When fullSync=false, SyncTo provides no + // durability guarantees, so this can help surface bugs where we improperly + // rely on SyncTo providing durability. + return false, nil +} + func (f *memFile) Fd() uintptr { return InvalidFd } diff --git a/vfs/syncing_file.go b/vfs/syncing_file.go index 6224f3a94e..3a0f33f100 100644 --- a/vfs/syncing_file.go +++ b/vfs/syncing_file.go @@ -12,6 +12,8 @@ import ( // SyncingFileOptions holds the options for a syncingFile. type SyncingFileOptions struct { + // NoSyncOnClose elides the automatic Sync during Close if it's not possible + // to sync the remainder of the file in a non-blocking way. NoSyncOnClose bool BytesPerSync int PreallocateSize int @@ -21,8 +23,6 @@ type syncingFile struct { File // fd can be InvalidFd if the underlying File does not support it. fd uintptr - useSyncRange bool - closing bool noSyncOnClose bool bytesPerSync int64 preallocateSize int64 @@ -37,9 +37,6 @@ type syncingFile struct { syncOffset int64 } preallocatedBlocks int64 - syncData func() error - syncTo func(offset int64) error - timeDiskOp func(op func()) } // NewSyncingFile wraps a writable file and ensures that data is synced @@ -58,23 +55,6 @@ func NewSyncingFile(f File, opts SyncingFileOptions) File { // Ensure a file that is opened and then closed will be synced, even if no // data has been written to it. s.atomic.syncOffset = -1 - - type dhChecker interface { - timeDiskOp(op func()) - } - if d, ok := f.(dhChecker); ok { - s.timeDiskOp = d.timeDiskOp - } else { - s.timeDiskOp = func(op func()) { - op() - } - } - - s.init() - - if s.syncData == nil { - s.syncData = s.File.Sync - } return s } @@ -108,7 +88,7 @@ func (f *syncingFile) preallocate(offset int64) error { length := f.preallocateSize * (newPreallocatedBlocks - f.preallocatedBlocks) offset = f.preallocateSize * f.preallocatedBlocks f.preallocatedBlocks = newPreallocatedBlocks - return preallocExtend(f.fd, offset, length) + return f.Preallocate(offset, length) } func (f *syncingFile) ratchetSyncOffset(offset int64) { @@ -127,9 +107,10 @@ func (f *syncingFile) Sync() error { // We update syncOffset (atomically) in order to avoid spurious syncs in // maybeSync. Note that even if syncOffset is larger than the current file // offset, we still need to call the underlying file's sync for persistence - // guarantees (which are not provided by sync_file_range). + // guarantees which are not provided by SyncTo (or by sync_file_range on + // Linux). f.ratchetSyncOffset(atomic.LoadInt64(&f.atomic.offset)) - return f.syncData() + return f.SyncData() } func (f *syncingFile) maybeSync() error { @@ -163,25 +144,44 @@ func (f *syncingFile) maybeSync() error { return errors.WithStack(f.Sync()) } - // Note that syncTo will always be called with an offset < atomic.offset. The - // syncTo implementation may choose to sync the entire file (i.e. on OSes - // which do not support syncing a portion of the file). The syncTo - // implementation must call ratchetSyncOffset with as much of the file as it - // has synced. - return errors.WithStack(f.syncTo(syncToOffset)) + // Note that SyncTo will always be called with an offset < atomic.offset. + // The SyncTo implementation may choose to sync the entire file (i.e. on + // OSes which do not support syncing a portion of the file). + fullSync, err := f.SyncTo(syncToOffset) + if err != nil { + return errors.WithStack(err) + } + if fullSync { + f.ratchetSyncOffset(offset) + } else { + f.ratchetSyncOffset(syncToOffset) + } + return nil } func (f *syncingFile) Close() error { // Sync any data that has been written but not yet synced unless the file // has noSyncOnClose option explicitly set. - // Note that if SyncFileRange was used, atomic.syncOffset will be less than - // atomic.offset. See syncingFile.syncToRange. - f.closing = true - if !f.noSyncOnClose || f.useSyncRange { - if atomic.LoadInt64(&f.atomic.offset) > atomic.LoadInt64(&f.atomic.syncOffset) { - if err := f.Sync(); err != nil { - return errors.WithStack(err) + // + // NB: If the file is capable of non-durability-guarantee SyncTos, and the + // caller has not called Sync since the last write, syncOffset is guaranteed + // to be less than atomic.offset. This ensures we fall into the below + // conditional and perform a full sync to durably persist the file. + if off := atomic.LoadInt64(&f.atomic.offset); off > atomic.LoadInt64(&f.atomic.syncOffset) { + // There's still remaining dirty data. + + if f.noSyncOnClose { + // If NoSyncOnClose is set, only perform a SyncTo. On linux, SyncTo + // translates to a non-blocking `sync_file_range` call which + // provides no persistence guarantee. Since it's non-blocking, + // there's no latency hit of a blocking sync call, but we still + // ensure we're not allowing significant dirty data to accumulate. + if _, err := f.File.SyncTo(off); err != nil { + return err } + f.ratchetSyncOffset(off) + } else if err := f.Sync(); err != nil { + return errors.WithStack(err) } } return errors.WithStack(f.File.Close()) diff --git a/vfs/syncing_file_generic.go b/vfs/syncing_file_generic.go deleted file mode 100644 index 7835b722bd..0000000000 --- a/vfs/syncing_file_generic.go +++ /dev/null @@ -1,16 +0,0 @@ -// Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use -// of this source code is governed by a BSD-style license that can be found in -// the LICENSE file. - -//go:build !linux || arm -// +build !linux arm - -package vfs - -func (f *syncingFile) init() { - f.syncTo = f.syncToGeneric -} - -func (f *syncingFile) syncToGeneric(_ int64) error { - return f.Sync() -} diff --git a/vfs/syncing_file_linux.go b/vfs/syncing_file_linux.go deleted file mode 100644 index bdc008e481..0000000000 --- a/vfs/syncing_file_linux.go +++ /dev/null @@ -1,105 +0,0 @@ -// Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use -// of this source code is governed by a BSD-style license that can be found in -// the LICENSE file. - -//go:build linux && !arm -// +build linux,!arm - -package vfs - -import "golang.org/x/sys/unix" - -type syncFileRange func(fd int, off int64, n int64, flags int) (err error) - -// sync_file_range depends on both the filesystem, and the broader kernel -// support. In particular, Windows Subsystem for Linux does not support -// sync_file_range, even when used with ext{2,3,4}. syncRangeSmokeTest performs -// a test of of sync_file_range, returning false on ENOSYS, and true otherwise. -func syncRangeSmokeTest(fd uintptr, fn syncFileRange) bool { - err := fn(int(fd), 0 /* offset */, 0 /* nbytes */, 0 /* flags */) - return err != unix.ENOSYS -} - -func isSyncRangeSupported(fd uintptr) bool { - var stat unix.Statfs_t - if err := unix.Fstatfs(int(fd), &stat); err != nil { - return false - } - - // Allowlist which filesystems we allow using sync_file_range with as some - // filesystems treat that syscall as a noop (notably ZFS). A allowlist is - // used instead of a denylist in order to have a more graceful failure mode - // in case a filesystem we haven't tested is encountered. Currently only - // ext2/3/4 are known to work properly. - const extMagic = 0xef53 - switch stat.Type { - case extMagic: - return syncRangeSmokeTest(fd, unix.SyncFileRange) - } - return false -} - -func (f *syncingFile) init() { - if f.fd == InvalidFd { - return - } - f.timeDiskOp(func() { - f.useSyncRange = isSyncRangeSupported(f.fd) - }) - if f.useSyncRange { - f.syncTo = f.syncToRange - } else { - f.syncTo = f.syncToFdatasync - } - f.syncData = f.syncFdatasync -} - -func (f *syncingFile) syncFdatasync() error { - if f.fd == InvalidFd { - return f.File.Sync() - } - var err error - f.timeDiskOp(func() { - err = unix.Fdatasync(int(f.fd)) - }) - return err -} - -func (f *syncingFile) syncToFdatasync(_ int64) error { - return f.Sync() -} - -func (f *syncingFile) syncToRange(offset int64) error { - const ( - waitBefore = 0x1 - write = 0x2 - // waitAfter = 0x4 - ) - - // The flags for the sync_file_range system call. Unless the file has - // noSyncOnClose explicitly set and it is being closed, the waitBefore - // flag will be set which may block the call. - flags := write - if !f.noSyncOnClose || !f.closing { - flags |= waitBefore - } - - // Note that syncToRange is only called with an offset that is guaranteed to - // be less than atomic.offset (i.e. the write offset). This implies the - // syncingFile.Close will Sync the rest of the data, as well as the file's - // metadata. - f.ratchetSyncOffset(offset) - - // By specifying write|waitBefore for the flags, we're instructing - // SyncFileRange to a) wait for any outstanding data being written to finish, - // and b) to queue any other dirty data blocks in the range [0,offset] for - // writing. The actual writing of this data will occur asynchronously. The - // use of `waitBefore` is to limit how much dirty data is allowed to - // accumulate. Linux sometimes behaves poorly when a large amount of dirty - // data accumulates, impacting other I/O operations. - var err error - f.timeDiskOp(func() { - err = unix.SyncFileRange(int(f.fd), 0, offset, flags) - }) - return err -} diff --git a/vfs/syncing_file_test.go b/vfs/syncing_file_test.go index 0dfe19c7c7..7173e9143c 100644 --- a/vfs/syncing_file_test.go +++ b/vfs/syncing_file_test.go @@ -27,19 +27,9 @@ func TestSyncingFile(t *testing.T) { f, err := Default.Create(filename) require.NoError(t, err) - s := NewSyncingFile(f, SyncingFileOptions{}) - if s == f { - t.Fatalf("failed to wrap: %p != %p", f, s) - } - s = NewSyncingFile(f, SyncingFileOptions{BytesPerSync: 8 << 10 /* 8 KB */}) - s.(*syncingFile).fd = 1 - s.(*syncingFile).syncTo = func(offset int64) error { - s.(*syncingFile).ratchetSyncOffset(offset) - return nil - } - - t.Logf("sync_file_range=%t", s.(*syncingFile).useSyncRange) - + tf := &mockSyncToFile{File: f, canSyncTo: true} + sf := NewSyncingFile(tf, SyncingFileOptions{BytesPerSync: 8 << 10 /* 8 KB */}) + sf.(*syncingFile).fd = 1 testCases := []struct { n int64 expectedSyncTo int64 @@ -52,10 +42,10 @@ func TestSyncingFile(t *testing.T) { {16 << 10, mb + 32<<10}, } for i, c := range testCases { - _, err := s.Write(make([]byte, c.n)) + _, err := sf.Write(make([]byte, c.n)) require.NoError(t, err) - syncTo := atomic.LoadInt64(&s.(*syncingFile).atomic.syncOffset) + syncTo := atomic.LoadInt64(&sf.(*syncingFile).atomic.syncOffset) if c.expectedSyncTo != syncTo { t.Fatalf("%d: expected sync to %d, but found %d", i, c.expectedSyncTo, syncTo) } @@ -64,26 +54,26 @@ func TestSyncingFile(t *testing.T) { func TestSyncingFileClose(t *testing.T) { testCases := []struct { - syncToEnabled bool - expected string + canSyncTo bool + expected string }{ - {true, `sync-to(1048576): test [] -sync-to(2097152): test [] -sync-to(3145728): test [] + {true, `sync-to(1048576): test [false,] +sync-to(2097152): test [false,] +sync-to(3145728): test [false,] pre-close: test [offset=4194304 sync-offset=3145728] -sync: test [] +sync-data: test [] close: test [] `}, - // When SyncFileRange is not being used, the last sync call ends up syncing - // all of the data causing syncingFile.Close to elide the sync. - {false, `sync: test [] -sync: test [] + // When SyncTo is not being used, the last sync call ends up syncing all + // of the data causing syncingFile.Close to elide the sync. + {false, `sync-to(1048576): test [true,] +sync-to(3145728): test [true,] pre-close: test [offset=4194304 sync-offset=4194304] close: test [] `}, } for _, c := range testCases { - t.Run("", func(t *testing.T) { + t.Run(fmt.Sprintf("canSyncTo=%t", c.canSyncTo), func(t *testing.T) { tmpf, err := os.CreateTemp("", "pebble-db-syncing-file-") require.NoError(t, err) @@ -95,20 +85,9 @@ close: test [] require.NoError(t, err) var buf bytes.Buffer - lf := loggingFile{f, "test", &buf} - + tf := &mockSyncToFile{File: f, canSyncTo: c.canSyncTo} + lf := loggingFile{tf, "test", &buf} s := NewSyncingFile(lf, SyncingFileOptions{BytesPerSync: 8 << 10 /* 8 KB */}).(*syncingFile) - if c.syncToEnabled { - s.fd = 1 - s.syncData = lf.Sync - s.syncTo = func(offset int64) error { - s.ratchetSyncOffset(offset) - fmt.Fprintf(lf.w, "sync-to(%d): %s [%v]\n", offset, lf.name, err) - return nil - } - } else { - s.fd = InvalidFd - } write := func(n int64) { t.Helper() @@ -132,18 +111,38 @@ close: test [] } } +type mockSyncToFile struct { + File + canSyncTo bool +} + +func (f *mockSyncToFile) SyncTo(length int64) (fullSync bool, err error) { + if !f.canSyncTo { + if err = f.File.SyncData(); err != nil { + return false, err + } + return true, nil + } + // f.canSyncTo = true + if _, err = f.File.SyncTo(length); err != nil { + return false, err + } + // NB: If the underlying file performed a full sync, lie. + return false, nil +} + func TestSyncingFileNoSyncOnClose(t *testing.T) { testCases := []struct { - useSyncRange bool + useSyncTo bool expectBefore int64 expectAfter int64 }{ - {false, 2 << 20, 2 << 20}, + {false, 2 << 20, 3<<20 + 128}, {true, 2 << 20, 3<<20 + 128}, } for _, c := range testCases { - t.Run(fmt.Sprintf("useSyncRange=%v", c.useSyncRange), func(t *testing.T) { + t.Run(fmt.Sprintf("useSyncTo=%v", c.useSyncTo), func(t *testing.T) { tmpf, err := os.CreateTemp("", "pebble-db-syncing-file-") require.NoError(t, err) @@ -155,10 +154,9 @@ func TestSyncingFileNoSyncOnClose(t *testing.T) { require.NoError(t, err) var buf bytes.Buffer - lf := loggingFile{f, "test", &buf} - + tf := &mockSyncToFile{f, c.useSyncTo} + lf := loggingFile{tf, "test", &buf} s := NewSyncingFile(lf, SyncingFileOptions{NoSyncOnClose: true, BytesPerSync: 8 << 10}).(*syncingFile) - s.useSyncRange = c.useSyncRange write := func(n int64) { t.Helper() @@ -175,9 +173,13 @@ func TestSyncingFileNoSyncOnClose(t *testing.T) { require.NoError(t, s.Close()) syncToAfter := atomic.LoadInt64(&s.atomic.syncOffset) - if syncToBefore != c.expectBefore || syncToAfter != c.expectAfter { - t.Fatalf("Expected syncTo before and after closing are %d %d but found %d %d", - c.expectBefore, c.expectAfter, syncToBefore, syncToAfter) + // If we're not able to non-blockingly sync using sync-to, + // NoSyncOnClose should elide the sync. + if !c.useSyncTo { + if syncToBefore != c.expectBefore || syncToAfter != c.expectAfter { + t.Fatalf("Expected syncTo before and after closing are %d %d but found %d %d", + c.expectBefore, c.expectAfter, syncToBefore, syncToAfter) + } } }) } @@ -239,7 +241,7 @@ func BenchmarkSyncWrite(b *testing.B) { if err != nil { b.Fatal(err) } - return NewSyncingFile(t, SyncingFileOptions{PreallocateSize: 0}) + return NewSyncingFile(wrapOSFile(t), SyncingFileOptions{PreallocateSize: 0}) }) }) } @@ -254,7 +256,7 @@ func BenchmarkSyncWrite(b *testing.B) { if err != nil { b.Fatal(err) } - return NewSyncingFile(t, SyncingFileOptions{PreallocateSize: 4 << 20}) + return NewSyncingFile(wrapOSFile(t), SyncingFileOptions{PreallocateSize: 4 << 20}) }) }) } @@ -285,7 +287,7 @@ func BenchmarkSyncWrite(b *testing.B) { if err != nil { b.Fatal(err) } - return NewSyncingFile(t, SyncingFileOptions{PreallocateSize: 0}) + return NewSyncingFile(wrapOSFile(t), SyncingFileOptions{PreallocateSize: 0}) }) }) } diff --git a/vfs/vfs.go b/vfs/vfs.go index 9c983ac320..ea9e3210be 100644 --- a/vfs/vfs.go +++ b/vfs/vfs.go @@ -29,9 +29,36 @@ type File interface { // method *is* allowed to modify the slice passed in, whether temporarily // or permanently. Callers of Write() need to take this into account. io.Writer + + // Preallocate optionally preallocates storage for `length` at `offset` + // within the file. Implementations may choose to do nothing. + Preallocate(offset, length int64) error Stat() (os.FileInfo, error) Sync() error + // SyncTo requests that a prefix of the file's data be synced to stable + // storage. The caller passes provides a `length`, indicating how many bytes + // to sync from the beginning of the file. SyncTo is a no-op for + // directories, and therefore always returns false. + // + // SyncTo returns a fullSync return value, indicating one of two possible + // outcomes. + // + // If fullSync is false, the first `length` bytes of the file was queued to + // be synced to stable storage. The syncing of the file prefix may happen + // asynchronously. No persistence guarantee is provided. + // + // If fullSync is true, the entirety of the file's contents were + // synchronously synced to stable storage, and a persistence guarantee is + // provided. In this outcome, any modified metadata for the file is not + // guaranteed to be synced unless that metadata is needed in order to allow + // a subsequent data retrieval to be correctly handled. + SyncTo(length int64) (fullSync bool, err error) + + // SyncData requires that all written data be persisted. File metadata is + // not required to be synced. Unsophisticated implementations may call Sync. + SyncData() error + // Fd returns the raw file descriptor when a File is backed by an *os.File. // It can be used for specific functionality like Prefetch. // Returns InvalidFd if not supported. @@ -155,10 +182,18 @@ var Default FS = defaultFS{} type defaultFS struct{} +// wrapOSFile takes a standard library OS file and returns a vfs.File. f may be +// nil, in which case wrapOSFile must not panic. In such cases, it's okay if the +// returned vfs.File may panic if used. +func wrapOSFile(f *os.File) File { + // See the implementations in default_{linux,unix,windows}.go. + return wrapOSFileImpl(f) +} + func (defaultFS) Create(name string) (File, error) { const openFlags = os.O_RDWR | os.O_CREATE | os.O_EXCL | syscall.O_CLOEXEC - f, err := os.OpenFile(name, openFlags, 0666) + osFile, err := os.OpenFile(name, openFlags, 0666) // If the file already exists, remove it and try again. // // NB: We choose to remove the file instead of truncating it, despite the @@ -169,11 +204,11 @@ func (defaultFS) Create(name string) (File, error) { // attempting to create the a file at the same path. for oserror.IsExist(err) { if removeErr := os.Remove(name); removeErr != nil && !oserror.IsNotExist(removeErr) { - return f, errors.WithStack(removeErr) + return wrapOSFile(osFile), errors.WithStack(removeErr) } - f, err = os.OpenFile(name, openFlags, 0666) + osFile, err = os.OpenFile(name, openFlags, 0666) } - return f, errors.WithStack(err) + return wrapOSFile(osFile), errors.WithStack(err) } func (defaultFS) Link(oldname, newname string) error { @@ -181,10 +216,11 @@ func (defaultFS) Link(oldname, newname string) error { } func (defaultFS) Open(name string, opts ...OpenOption) (File, error) { - file, err := os.OpenFile(name, os.O_RDONLY|syscall.O_CLOEXEC, 0) + osFile, err := os.OpenFile(name, os.O_RDONLY|syscall.O_CLOEXEC, 0) if err != nil { return nil, errors.WithStack(err) } + file := wrapOSFile(osFile) for _, opt := range opts { opt.Apply(file) } @@ -208,7 +244,7 @@ func (fs defaultFS) ReuseForWrite(oldname, newname string) (File, error) { return nil, errors.WithStack(err) } f, err := os.OpenFile(newname, os.O_RDWR|os.O_CREATE|syscall.O_CLOEXEC, 0666) - return f, errors.WithStack(err) + return wrapOSFile(f), errors.WithStack(err) } func (defaultFS) MkdirAll(dir string, perm os.FileMode) error { diff --git a/vfs/vfs_test.go b/vfs/vfs_test.go index 661ea1cc50..0b1289a0d3 100644 --- a/vfs/vfs_test.go +++ b/vfs/vfs_test.go @@ -112,12 +112,30 @@ func (f loggingFile) Close() error { return err } +func (f loggingFile) Preallocate(off, n int64) error { + err := f.File.Preallocate(off, n) + fmt.Fprintf(f.w, "preallocate(off=%d,n=%d): %s [%v]\n", off, n, f.name, err) + return err +} + func (f loggingFile) Sync() error { err := f.File.Sync() fmt.Fprintf(f.w, "sync: %s [%v]\n", f.name, err) return err } +func (f loggingFile) SyncData() error { + err := f.File.SyncData() + fmt.Fprintf(f.w, "sync-data: %s [%v]\n", f.name, err) + return err +} + +func (f loggingFile) SyncTo(length int64) (fullSync bool, err error) { + fullSync, err = f.File.SyncTo(length) + fmt.Fprintf(f.w, "sync-to(%d): %s [%t,%v]\n", length, f.name, fullSync, err) + return fullSync, err +} + func runTestVFS(t *testing.T, baseFS FS, dir string) { var buf bytes.Buffer fs := loggingFS{FS: baseFS, base: dir, w: &buf}