From d79f9617c9c5205ea26c28838ad3a1918d8fe6d1 Mon Sep 17 00:00:00 2001 From: Jackson Owens Date: Fri, 29 Apr 2022 16:11:32 -0400 Subject: [PATCH] vfs: extend disk-health checking to filesystem metadata operations Previously, disk stalls while syncing a filesystem directory were undetected, because the health-checking filesystem's OpenDir method did not wrap the directory handle. Additionally, disk stalls during all filesystem metadata operations (eg, file creation, removal, rename, etc) were undetected. These operations are performed directly on a vfs.FS (not a vfs.File), and require an adjusted implementation to accommodate concurrent, write-oriented operations. This commit implements a scheme in which filesystem metadata operations lock a vfs.FS-wide mutex to acquire a 'slot' into which they record their start time. When an operation completes, the operation atomically zeroes its slot without reacquiring the mutex. A long-running goroutine listens on a ticker, periodically acquiring the vfs.FS mutex and scanning the slots for start times older than a threshold. The long-running goroutine is launched lazily by the first filesystem operation to acquire the mutex. A new `io.Closer` return value of `WithDiskHealthChecks` may be used to stop the long-running goroutine. Fix #1643. --- db.go | 5 + options.go | 13 +- vfs/disk_health.go | 350 ++++++++++++++++++++++++++++++++++++++-- vfs/disk_health_test.go | 312 +++++++++++++++++++++++++++++++++++ vfs/disk_heath_test.go | 136 ---------------- vfs/fd_test.go | 4 +- vfs/vfs.go | 3 + 7 files changed, 669 insertions(+), 154 deletions(-) create mode 100644 vfs/disk_health_test.go delete mode 100644 vfs/disk_heath_test.go diff --git a/db.go b/db.go index c2a5ed0f00..9f3fcfd382 100644 --- a/db.go +++ b/db.go @@ -1241,6 +1241,11 @@ func (d *DB) Close() error { d.deleters.Wait() d.compactionSchedulers.Wait() d.mu.Lock() + + // If the options include a closer to 'close' the filesystem, close it. + if d.opts.private.fsCloser != nil { + d.opts.private.fsCloser.Close() + } return err } diff --git a/options.go b/options.go index 9bdd9b9a06..ef17dcb521 100644 --- a/options.go +++ b/options.go @@ -7,6 +7,7 @@ package pebble import ( "bytes" "fmt" + "io" "runtime" "strconv" "strings" @@ -703,6 +704,16 @@ type Options struct { // default is 1 MB/s. Currently disabled as this option has no effect while // private.enablePacing is false. minFlushRate int + + // fsCloser holds a closer that should be invoked after a DB using these + // Options is closed. This is used to automatically stop the + // long-running goroutine associated with the disk-health-checking FS. + // See the initialization of FS in EnsureDefaults. Note that care has + // been taken to ensure that it is still safe to continue using the FS + // after this closer has been invoked. However, if write operations + // against the FS are made after the DB is closed, the FS may leak a + // goroutine indefinitely. + fsCloser io.Closer } } @@ -826,7 +837,7 @@ func (o *Options) EnsureDefaults() *Options { } if o.FS == nil { - o.FS = vfs.WithDiskHealthChecks(vfs.Default, 5*time.Second, + o.FS, o.private.fsCloser = vfs.WithDiskHealthChecks(vfs.Default, 5*time.Second, func(name string, duration time.Duration) { o.EventListener.DiskSlow(DiskSlowInfo{ Path: name, diff --git a/vfs/disk_health.go b/vfs/disk_health.go index 19fdeb6174..e7b0b129b1 100644 --- a/vfs/disk_health.go +++ b/vfs/disk_health.go @@ -5,6 +5,9 @@ package vfs import ( + "io" + "os" + "sync" "sync/atomic" "time" ) @@ -13,6 +16,12 @@ const ( // defaultTickInterval is the default interval between two ticks of each // diskHealthCheckingFile loop iteration. defaultTickInterval = 2 * time.Second + // preallocatedSlotCount is the default number of slots available for + // concurrent filesystem operations. The slot count may be exceeded, but + // each additional slot will incur an additional allocation. We choose 16 + // here with the expectation that it is significantly more than required in + // practice. See the comment above the diskHealthCheckingFS type definition. + preallocatedSlotCount = 16 ) // diskHealthCheckingFile is a File wrapper to detect slow disk operations, and @@ -123,30 +132,231 @@ func (d *diskHealthCheckingFile) timeDiskOp(op func()) { op() } +// diskHealthCheckingFS adds disk-health checking facilities to a VFS. +// It times disk write operations in two ways: +// +// 1. Wrapping vfs.Files. +// +// The bulk of write I/O activity is file writing and syncing, invoked through +// the `vfs.File` interface. This VFS wraps all files open for writing with a +// special diskHealthCheckingFile implementation of the vfs.File interface. See +// above for the implementation. +// +// 2. Monitoring filesystem metadata operations. +// +// Filesystem metadata operations (create, link, remove, rename, etc) are also +// sources of disk writes. Unlike a vfs.File which requires Write and Sync calls +// to be sequential, a vfs.FS may receive these filesystem metadata operations +// in parallel. To accommodate this parallelism, the diskHealthCheckingFS's +// write-oriented filesystem operations record their start times into a 'slot' +// on the filesystem. A single long-running goroutine periodically scans the +// slots looking for slow operations. +// +// The number of slots on a diskHealthCheckingFS grows to a working set of the +// maximum concurrent filesystem operations. This is expected to be very few +// for these reasons: +// 1. Pebble has limited write concurrency. Flushes, compactions and WAL +// rotations are the primary sources of filesystem metadata operations. With +// the default max-compaction concurrency, these operations require at most 5 +// concurrent slots if all 5 perform a filesystem metadata operation +// simultaneously. +// 2. Pebble's limited concurrent I/O writers spend most of their time +// performing file I/O, not performing the filesystem metadata operations that +// require recording a slot on the diskHealthCheckingFS. +// 3. In CockroachDB, each additional store/Pebble instance has its own vfs.FS +// which provides a separate goroutine and set of slots. +// 4. In CockroachDB, many of the additional sources of filesystem metadata +// operations (like encryption-at-rest) are sequential with respect to Pebble's +// threads. type diskHealthCheckingFS struct { - FS - + tickInterval time.Duration diskSlowThreshold time.Duration onSlowDisk func(string, time.Duration) + fs FS + mu struct { + sync.Mutex + tickerRunning bool + stopper chan struct{} + inflight []*slot + } + // prealloc preallocates the memory for mu.inflight slots and the slice + // itself. The contained fields are not accessed directly except by + // WithDiskHealthChecks when initializing mu.inflight. The number of slots + // in d.mu.inflight will grow to the maximum number of concurrent file + // metadata operations (create, remove, link, etc). If the number of + // concurrent operations never exceeds preallocatedSlotCount, we'll never + // incur an additional allocation. + prealloc struct { + slots [preallocatedSlotCount]slot + slotPtrSlice [preallocatedSlotCount]*slot + } +} + +type slot struct { + name string + startNanos int64 } -// WithDiskHealthChecks wraps an FS and ensures that all -// write-oriented created with that FS are wrapped with disk health detection -// checks. Disk operations that are observed to take longer than -// diskSlowThreshold trigger an onSlowDisk call. +// diskHealthCheckingFS implements FS. +var _ FS = (*diskHealthCheckingFS)(nil) + +// WithDiskHealthChecks wraps an FS and ensures that all write-oriented +// operations on the FS are wrapped with disk health detection checks. Disk +// operations that are observed to take longer than diskSlowThreshold trigger an +// onSlowDisk call. +// +// A threshold of zero disables disk-health checking. func WithDiskHealthChecks( - fs FS, diskSlowThreshold time.Duration, onSlowDisk func(string, time.Duration), -) FS { - return diskHealthCheckingFS{ - FS: fs, + innerFS FS, diskSlowThreshold time.Duration, onSlowDisk func(string, time.Duration), +) (FS, io.Closer) { + if diskSlowThreshold == 0 { + return innerFS, noopCloser{} + } + + fs := &diskHealthCheckingFS{ + fs: innerFS, + tickInterval: defaultTickInterval, diskSlowThreshold: diskSlowThreshold, onSlowDisk: onSlowDisk, } + fs.mu.stopper = make(chan struct{}) + // The fs holds preallocated slots and a preallocated array of slot pointers + // with equal length. Initialize the inflight slice to use a slice backed by + // the preallocated array with each slot initialized to a preallocated slot. + fs.mu.inflight = fs.prealloc.slotPtrSlice[:] + for i := range fs.mu.inflight { + fs.mu.inflight[i] = &fs.prealloc.slots[i] + } + return fs, fs } -// Create implements the vfs.FS interface. -func (d diskHealthCheckingFS) Create(name string) (File, error) { - f, err := d.FS.Create(name) +func (d *diskHealthCheckingFS) timeFilesystemOp(name string, op func()) { + if d == nil { + op() + return + } + + // Record this operation's start time on the FS, so that the long-running + // goroutine can monitor the filesystem operation. + // + // The diskHealthCheckingFile implementation uses a single field that is + // atomically updated, taking advantage of the fact that writes to a single + // vfs.File handle are not performed in parallel. The vfs.FS however may + // receive write filesystem operations in parallel. To accommodate this + // parallelism, writing goroutines append their start time to a + // mutex-protected vector. On ticks, the long-running goroutine scans the + // vector searching for start times older than the slow-disk threshold. When + // a writing goroutine completes its operation, it atomically overwrites its + // slot to signal completion. + var s *slot + func() { + d.mu.Lock() + defer d.mu.Unlock() + + // If there's no long-running goroutine to monitor this filesystem + // operation, start one. + if !d.mu.tickerRunning { + d.startTickerLocked() + } + + startNanos := time.Now().UnixNano() + for i := 0; i < len(d.mu.inflight); i++ { + if atomic.LoadInt64(&d.mu.inflight[i].startNanos) == 0 { + // This slot is not in use. Claim it. + s = d.mu.inflight[i] + s.name = name + atomic.StoreInt64(&s.startNanos, startNanos) + break + } + } + // If we didn't find any unused slots, create a new slot and append it. + // This slot will exist forever. The number of slots will grow to the + // maximum number of concurrent filesystem operations over the lifetime + // of the process. Only operations that grow the number of slots must + // incur an allocation. + if s == nil { + s = &slot{ + name: name, + startNanos: startNanos, + } + d.mu.inflight = append(d.mu.inflight, s) + } + }() + + op() + + // Signal completion by zeroing the start time. + atomic.StoreInt64(&s.startNanos, 0) +} + +// startTickerLocked starts a new goroutine with a ticker to monitor disk +// filesystem operations. Requires d.mu and !d.mu.tickerRunning. +func (d *diskHealthCheckingFS) startTickerLocked() { + d.mu.tickerRunning = true + stopper := d.mu.stopper + go func() { + ticker := time.NewTicker(d.tickInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + // Scan the inflight slots for any slots recording a start + // time older than the diskSlowThreshold. + d.mu.Lock() + now := time.Now() + for i := range d.mu.inflight { + nanos := atomic.LoadInt64(&d.mu.inflight[i].startNanos) + if nanos != 0 && time.Unix(0, nanos).Add(d.diskSlowThreshold).Before(now) { + // diskSlowThreshold was exceeded. Invoke the provided + // callback. + d.onSlowDisk(d.mu.inflight[i].name, now.Sub(time.Unix(0, nanos))) + } + } + d.mu.Unlock() + case <-stopper: + return + } + } + }() +} + +// Close implements io.Closer. Close stops the long-running goroutine that +// monitors for slow filesystem metadata operations. Close may be called +// multiple times. If the filesystem is used after Close has been called, a new +// long-running goroutine will be created. +func (d *diskHealthCheckingFS) Close() error { + d.mu.Lock() + if !d.mu.tickerRunning { + // Nothing to stop. + d.mu.Unlock() + return nil + } + + // Grab the stopper so we can request the long-running goroutine to stop. + // Replace the stopper in case this FS is reused. It's possible to Close and + // reuse a disk-health checking FS. This is to accommodate the on-by-default + // behavior in Pebble, and the possibility that users may continue to use + // the Pebble default FS beyond the lifetime of a single DB. + stopper := d.mu.stopper + d.mu.stopper = make(chan struct{}) + d.mu.tickerRunning = false + d.mu.Unlock() + + // Ask the long-running goroutine to stop. This is a synchronous channel + // send. + stopper <- struct{}{} + close(stopper) + return nil +} + +// Create implements the FS interface. +func (d *diskHealthCheckingFS) Create(name string) (File, error) { + var f File + var err error + d.timeFilesystemOp(name, func() { + f, err = d.fs.Create(name) + }) if err != nil { return f, err } @@ -160,9 +370,108 @@ func (d diskHealthCheckingFS) Create(name string) (File, error) { return WithFd(f, checkingFile), nil } -// ReuseForWrite implements the vfs.FS interface. -func (d diskHealthCheckingFS) ReuseForWrite(oldname, newname string) (File, error) { - f, err := d.FS.ReuseForWrite(oldname, newname) +// GetDiskUsage implements the FS interface. +func (d *diskHealthCheckingFS) GetDiskUsage(path string) (DiskUsage, error) { + return d.fs.GetDiskUsage(path) +} + +// Link implements the FS interface. +func (d *diskHealthCheckingFS) Link(oldname, newname string) error { + var err error + d.timeFilesystemOp(newname, func() { + err = d.fs.Link(oldname, newname) + }) + return err +} + +// List implements the FS interface. +func (d *diskHealthCheckingFS) List(dir string) ([]string, error) { + return d.fs.List(dir) +} + +// Lock implements the FS interface. +func (d *diskHealthCheckingFS) Lock(name string) (io.Closer, error) { + return d.fs.Lock(name) +} + +// MkdirAll implements the FS interface. +func (d *diskHealthCheckingFS) MkdirAll(dir string, perm os.FileMode) error { + var err error + d.timeFilesystemOp(dir, func() { + err = d.fs.MkdirAll(dir, perm) + }) + return err +} + +// Open implements the FS interface. +func (d *diskHealthCheckingFS) Open(name string, opts ...OpenOption) (File, error) { + return d.fs.Open(name, opts...) +} + +// OpenDir implements the FS interface. +func (d *diskHealthCheckingFS) OpenDir(name string) (File, error) { + f, err := d.fs.OpenDir(name) + if err != nil { + return f, err + } + // Directories opened with OpenDir must be opened with health checking, + // because they may be explicitly synced. + checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, func(duration time.Duration) { + d.onSlowDisk(name, duration) + }) + checkingFile.startTicker() + return WithFd(f, checkingFile), nil +} + +// PathBase implements the FS interface. +func (d *diskHealthCheckingFS) PathBase(path string) string { + return d.fs.PathBase(path) +} + +// PathJoin implements the FS interface. +func (d *diskHealthCheckingFS) PathJoin(elem ...string) string { + return d.fs.PathJoin(elem...) +} + +// PathDir implements the FS interface. +func (d *diskHealthCheckingFS) PathDir(path string) string { + return d.fs.PathDir(path) +} + +// Remove implements the FS interface. +func (d *diskHealthCheckingFS) Remove(name string) error { + var err error + d.timeFilesystemOp(name, func() { + err = d.fs.Remove(name) + }) + return err +} + +// RemoveAll implements the FS interface. +func (d *diskHealthCheckingFS) RemoveAll(name string) error { + var err error + d.timeFilesystemOp(name, func() { + err = d.fs.RemoveAll(name) + }) + return err +} + +// Rename implements the FS interface. +func (d *diskHealthCheckingFS) Rename(oldname, newname string) error { + var err error + d.timeFilesystemOp(newname, func() { + err = d.fs.Rename(oldname, newname) + }) + return err +} + +// ReuseForWrite implements the FS interface. +func (d *diskHealthCheckingFS) ReuseForWrite(oldname, newname string) (File, error) { + var f File + var err error + d.timeFilesystemOp(newname, func() { + f, err = d.fs.ReuseForWrite(oldname, newname) + }) if err != nil { return f, err } @@ -175,3 +484,12 @@ func (d diskHealthCheckingFS) ReuseForWrite(oldname, newname string) (File, erro checkingFile.startTicker() return WithFd(f, checkingFile), nil } + +// Stat implements the FS interface. +func (d *diskHealthCheckingFS) Stat(name string) (os.FileInfo, error) { + return d.fs.Stat(name) +} + +type noopCloser struct{} + +func (noopCloser) Close() error { return nil } diff --git a/vfs/disk_health_test.go b/vfs/disk_health_test.go new file mode 100644 index 0000000000..3add516eb9 --- /dev/null +++ b/vfs/disk_health_test.go @@ -0,0 +1,312 @@ +// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package vfs + +import ( + "io" + "os" + "sync/atomic" + "testing" + "time" + + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +type mockFile struct { + syncDuration time.Duration +} + +func (m mockFile) Close() error { + return nil +} + +func (m mockFile) Read(p []byte) (n int, err error) { + panic("unimplemented") +} + +func (m mockFile) ReadAt(p []byte, off int64) (n int, err error) { + panic("unimplemented") +} + +func (m mockFile) Write(p []byte) (n int, err error) { + time.Sleep(m.syncDuration) + return len(p), nil +} + +func (m mockFile) Stat() (os.FileInfo, error) { + panic("unimplemented") +} + +func (m mockFile) Sync() error { + time.Sleep(m.syncDuration) + return nil +} + +var _ File = &mockFile{} + +type mockFS struct { + create func(string) (File, error) + link func(string, string) error + list func(string) ([]string, error) + lock func(string) (io.Closer, error) + mkdirAll func(string, os.FileMode) error + open func(string, ...OpenOption) (File, error) + openDir func(string) (File, error) + pathBase func(string) string + pathJoin func(...string) string + pathDir func(string) string + remove func(string) error + removeAll func(string) error + rename func(string, string) error + reuseForWrite func(string, string) (File, error) + stat func(string) (os.FileInfo, error) + getDiskUsage func(string) (DiskUsage, error) +} + +func (m mockFS) Create(name string) (File, error) { + if m.create == nil { + panic("unimplemented") + } + return m.create(name) +} + +func (m mockFS) Link(oldname, newname string) error { + if m.link == nil { + panic("unimplemented") + } + return m.link(oldname, newname) +} + +func (m mockFS) Open(name string, opts ...OpenOption) (File, error) { + if m.open == nil { + panic("unimplemented") + } + return m.open(name, opts...) +} + +func (m mockFS) OpenDir(name string) (File, error) { + if m.openDir == nil { + panic("unimplemented") + } + return m.openDir(name) +} + +func (m mockFS) Remove(name string) error { + if m.remove == nil { + panic("unimplemented") + } + return m.remove(name) +} + +func (m mockFS) RemoveAll(name string) error { + if m.removeAll == nil { + panic("unimplemented") + } + return m.removeAll(name) +} + +func (m mockFS) Rename(oldname, newname string) error { + if m.rename == nil { + panic("unimplemented") + } + return m.rename(oldname, newname) +} + +func (m mockFS) ReuseForWrite(oldname, newname string) (File, error) { + if m.reuseForWrite == nil { + panic("unimplemented") + } + return m.reuseForWrite(oldname, newname) +} + +func (m mockFS) MkdirAll(dir string, perm os.FileMode) error { + if m.mkdirAll == nil { + panic("unimplemented") + } + return m.mkdirAll(dir, perm) +} + +func (m mockFS) Lock(name string) (io.Closer, error) { + if m.lock == nil { + panic("unimplemented") + } + return m.lock(name) +} + +func (m mockFS) List(dir string) ([]string, error) { + if m.list == nil { + panic("unimplemented") + } + return m.list(dir) +} + +func (m mockFS) Stat(name string) (os.FileInfo, error) { + if m.stat == nil { + panic("unimplemented") + } + return m.stat(name) +} + +func (m mockFS) PathBase(path string) string { + if m.pathBase == nil { + panic("unimplemented") + } + return m.pathBase(path) +} + +func (m mockFS) PathJoin(elem ...string) string { + if m.pathJoin == nil { + panic("unimplemented") + } + return m.pathJoin(elem...) +} + +func (m mockFS) PathDir(path string) string { + if m.pathDir == nil { + panic("unimplemented") + } + return m.pathDir(path) +} + +func (m mockFS) GetDiskUsage(path string) (DiskUsage, error) { + if m.getDiskUsage == nil { + panic("unimplemented") + } + return m.getDiskUsage(path) +} + +var _ FS = &mockFS{} + +func TestDiskHealthChecking_Sync(t *testing.T) { + diskSlow := make(chan time.Duration, 100) + slowThreshold := 1 * time.Second + mockFS := &mockFS{create: func(name string) (File, error) { + return mockFile{syncDuration: 3 * time.Second}, nil + }} + fs, closer := WithDiskHealthChecks(mockFS, slowThreshold, + func(s string, duration time.Duration) { + diskSlow <- duration + }) + defer closer.Close() + dhFile, _ := fs.Create("test") + defer dhFile.Close() + + dhFile.Sync() + + select { + case d := <-diskSlow: + if d.Seconds() < slowThreshold.Seconds() { + t.Fatalf("expected %0.1f to be greater than threshold %0.1f", d.Seconds(), slowThreshold.Seconds()) + } + case <-time.After(5 * time.Second): + t.Fatal("disk stall detector did not detect slow disk operation") + } +} + +var ( + errInjected = errors.New("injected error") +) + +func filesystemOpsMockFS(sleepDur time.Duration) *mockFS { + return &mockFS{ + create: func(name string) (File, error) { + time.Sleep(sleepDur) + return nil, errInjected + }, + link: func(oldname, newname string) error { + time.Sleep(sleepDur) + return errInjected + }, + mkdirAll: func(string, os.FileMode) error { + time.Sleep(sleepDur) + return errInjected + }, + remove: func(name string) error { + time.Sleep(sleepDur) + return errInjected + }, + removeAll: func(name string) error { + time.Sleep(sleepDur) + return errInjected + }, + rename: func(oldname, newname string) error { + time.Sleep(sleepDur) + return errInjected + }, + reuseForWrite: func(oldname, newname string) (File, error) { + time.Sleep(sleepDur) + return nil, errInjected + }, + } +} + +func stallFilesystemOperations(fs FS) map[string]func() { + return map[string]func(){ + "create": func() { _, _ = fs.Create("foo") }, + "link": func() { _ = fs.Link("foo", "bar") }, + "mkdir-all": func() { _ = fs.MkdirAll("foo", os.ModePerm) }, + "remove": func() { _ = fs.Remove("foo") }, + "remove-all": func() { _ = fs.RemoveAll("foo") }, + "rename": func() { _ = fs.Rename("foo", "bar") }, + "reuse-for-write": func() { _, _ = fs.ReuseForWrite("foo", "bar") }, + } +} + +func TestDiskHealthChecking_Filesystem(t *testing.T) { + const sleepDur = 50 * time.Millisecond + const stallThreshold = 10 * time.Millisecond + + // Wrap with disk-health checking, counting each stall on stallCount. + var stallCount uint64 + fs, closer := WithDiskHealthChecks(filesystemOpsMockFS(sleepDur), stallThreshold, + func(name string, dur time.Duration) { + atomic.AddUint64(&stallCount, 1) + }) + defer closer.Close() + fs.(*diskHealthCheckingFS).tickInterval = 5 * time.Millisecond + ops := stallFilesystemOperations(fs) + for name, op := range ops { + t.Run(name, func(t *testing.T) { + before := atomic.LoadUint64(&stallCount) + op() + after := atomic.LoadUint64(&stallCount) + require.Greater(t, int(after-before), 0) + }) + } +} + +// TestDiskHealthChecking_Filesystem_Close tests the behavior of repeatedly +// closing and reusing a filesystem wrapped by WithDiskHealthChecks. This is a +// permitted usage because it allows (*pebble.Options).EnsureDefaults to wrap +// with disk-health checking by default, and to clean up the long-running +// goroutine on (*pebble.DB).Close, while still allowing the FS to be used +// multiple times. +func TestDiskHealthChecking_Filesystem_Close(t *testing.T) { + const stallThreshold = 10 * time.Millisecond + mockFS := &mockFS{ + create: func(name string) (File, error) { + time.Sleep(50 * time.Millisecond) + return &mockFile{}, nil + }, + } + + stalled := map[string]time.Duration{} + fs, closer := WithDiskHealthChecks(mockFS, stallThreshold, + func(name string, dur time.Duration) { stalled[name] = dur }) + fs.(*diskHealthCheckingFS).tickInterval = 5 * time.Millisecond + + files := []string{"foo", "bar", "bax"} + for _, filename := range files { + // Create will stall, and the detector should write to the stalled map + // with the filename. + _, _ = fs.Create(filename) + // Invoke the closer. This will cause the long-running goroutine to + // exit, but the fs should still be usable and should still detect + // subsequent stalls on the next iteration. + require.NoError(t, closer.Close()) + require.Contains(t, stalled, filename) + } +} diff --git a/vfs/disk_heath_test.go b/vfs/disk_heath_test.go deleted file mode 100644 index 15ec57c1ec..0000000000 --- a/vfs/disk_heath_test.go +++ /dev/null @@ -1,136 +0,0 @@ -// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use -// of this source code is governed by a BSD-style license that can be found in -// the LICENSE file. - -package vfs - -import ( - "io" - "os" - "testing" - "time" -) - -type mockFile struct { - syncDuration time.Duration -} - -func (m mockFile) Close() error { - return nil -} - -func (m mockFile) Read(p []byte) (n int, err error) { - panic("unimplemented") -} - -func (m mockFile) ReadAt(p []byte, off int64) (n int, err error) { - panic("unimplemented") -} - -func (m mockFile) Write(p []byte) (n int, err error) { - time.Sleep(m.syncDuration) - return len(p), nil -} - -func (m mockFile) Stat() (os.FileInfo, error) { - panic("unimplemented") -} - -func (m mockFile) Sync() error { - time.Sleep(m.syncDuration) - return nil -} - -var _ File = &mockFile{} - -type mockFS struct { - syncDuration time.Duration -} - -func (m mockFS) Create(name string) (File, error) { - return mockFile(m), nil -} - -func (m mockFS) Link(oldname, newname string) error { - panic("unimplemented") -} - -func (m mockFS) Open(name string, opts ...OpenOption) (File, error) { - panic("unimplemented") -} - -func (m mockFS) OpenDir(name string) (File, error) { - panic("unimplemented") -} - -func (m mockFS) Remove(name string) error { - panic("unimplemented") -} - -func (m mockFS) RemoveAll(name string) error { - panic("unimplemented") -} - -func (m mockFS) Rename(oldname, newname string) error { - panic("unimplemented") -} - -func (m mockFS) ReuseForWrite(oldname, newname string) (File, error) { - return mockFile(m), nil -} - -func (m mockFS) MkdirAll(dir string, perm os.FileMode) error { - panic("unimplemented") -} - -func (m mockFS) Lock(name string) (io.Closer, error) { - panic("unimplemented") -} - -func (m mockFS) List(dir string) ([]string, error) { - panic("unimplemented") -} - -func (m mockFS) Stat(name string) (os.FileInfo, error) { - panic("unimplemented") -} - -func (m mockFS) PathBase(path string) string { - panic("unimplemented") -} - -func (m mockFS) PathJoin(elem ...string) string { - panic("unimplemented") -} - -func (m mockFS) PathDir(path string) string { - panic("unimplemented") -} - -func (m mockFS) GetDiskUsage(path string) (DiskUsage, error) { - panic("unimplemented") -} - -var _ FS = &mockFS{} - -func TestDiskHealthChecking(t *testing.T) { - diskSlow := make(chan time.Duration, 100) - slowThreshold := 1 * time.Second - mockFS := &mockFS{syncDuration: 3 * time.Second} - fs := WithDiskHealthChecks(mockFS, slowThreshold, func(s string, duration time.Duration) { - diskSlow <- duration - }) - dhFile, _ := fs.Create("test") - defer dhFile.Close() - - dhFile.Sync() - - select { - case d := <-diskSlow: - if d.Seconds() < slowThreshold.Seconds() { - t.Fatalf("expected %0.1f to be greater than threshold %0.1f", d.Seconds(), slowThreshold.Seconds()) - } - case <-time.After(5 * time.Second): - t.Fatal("disk stall detector did not detect slow disk operation") - } -} diff --git a/vfs/fd_test.go b/vfs/fd_test.go index 785b1f368b..2a7d5f9c10 100644 --- a/vfs/fd_test.go +++ b/vfs/fd_test.go @@ -22,7 +22,9 @@ func TestFileWrappersHaveFd(t *testing.T) { defer os.Remove(filename) // File wrapper case 1: Check if diskHealthCheckingFile has Fd(). - fs2 := WithDiskHealthChecks(Default, 10*time.Second, func(s string, duration time.Duration) {}) + fs2, closer := WithDiskHealthChecks(Default, 10*time.Second, + func(s string, duration time.Duration) {}) + defer closer.Close() f2, err := fs2.Open(filename) require.NoError(t, err) if _, ok := f2.(fdGetter); !ok { diff --git a/vfs/vfs.go b/vfs/vfs.go index 965bf373df..c816a8aaa0 100644 --- a/vfs/vfs.go +++ b/vfs/vfs.go @@ -18,6 +18,9 @@ import ( // // Typically, it will be an *os.File, but test code may choose to substitute // memory-backed implementations. +// +// Write-oriented operations (Write, Sync) must be called sequentially: At most +// 1 call to Write or Sync may be executed at any given time. type File interface { io.Closer io.Reader