diff --git a/db.go b/db.go index c2a5ed0f00..9f3fcfd382 100644 --- a/db.go +++ b/db.go @@ -1241,6 +1241,11 @@ func (d *DB) Close() error { d.deleters.Wait() d.compactionSchedulers.Wait() d.mu.Lock() + + // If the options include a closer to 'close' the filesystem, close it. + if d.opts.private.fsCloser != nil { + d.opts.private.fsCloser.Close() + } return err } diff --git a/options.go b/options.go index 9bdd9b9a06..ef17dcb521 100644 --- a/options.go +++ b/options.go @@ -7,6 +7,7 @@ package pebble import ( "bytes" "fmt" + "io" "runtime" "strconv" "strings" @@ -703,6 +704,16 @@ type Options struct { // default is 1 MB/s. Currently disabled as this option has no effect while // private.enablePacing is false. minFlushRate int + + // fsCloser holds a closer that should be invoked after a DB using these + // Options is closed. This is used to automatically stop the + // long-running goroutine associated with the disk-health-checking FS. + // See the initialization of FS in EnsureDefaults. Note that care has + // been taken to ensure that it is still safe to continue using the FS + // after this closer has been invoked. However, if write operations + // against the FS are made after the DB is closed, the FS may leak a + // goroutine indefinitely. + fsCloser io.Closer } } @@ -826,7 +837,7 @@ func (o *Options) EnsureDefaults() *Options { } if o.FS == nil { - o.FS = vfs.WithDiskHealthChecks(vfs.Default, 5*time.Second, + o.FS, o.private.fsCloser = vfs.WithDiskHealthChecks(vfs.Default, 5*time.Second, func(name string, duration time.Duration) { o.EventListener.DiskSlow(DiskSlowInfo{ Path: name, diff --git a/vfs/disk_health.go b/vfs/disk_health.go index 19fdeb6174..e7b0b129b1 100644 --- a/vfs/disk_health.go +++ b/vfs/disk_health.go @@ -5,6 +5,9 @@ package vfs import ( + "io" + "os" + "sync" "sync/atomic" "time" ) @@ -13,6 +16,12 @@ const ( // defaultTickInterval is the default interval between two ticks of each // diskHealthCheckingFile loop iteration. defaultTickInterval = 2 * time.Second + // preallocatedSlotCount is the default number of slots available for + // concurrent filesystem operations. The slot count may be exceeded, but + // each additional slot will incur an additional allocation. We choose 16 + // here with the expectation that it is significantly more than required in + // practice. See the comment above the diskHealthCheckingFS type definition. + preallocatedSlotCount = 16 ) // diskHealthCheckingFile is a File wrapper to detect slow disk operations, and @@ -123,30 +132,231 @@ func (d *diskHealthCheckingFile) timeDiskOp(op func()) { op() } +// diskHealthCheckingFS adds disk-health checking facilities to a VFS. +// It times disk write operations in two ways: +// +// 1. Wrapping vfs.Files. +// +// The bulk of write I/O activity is file writing and syncing, invoked through +// the `vfs.File` interface. This VFS wraps all files open for writing with a +// special diskHealthCheckingFile implementation of the vfs.File interface. See +// above for the implementation. +// +// 2. Monitoring filesystem metadata operations. +// +// Filesystem metadata operations (create, link, remove, rename, etc) are also +// sources of disk writes. Unlike a vfs.File which requires Write and Sync calls +// to be sequential, a vfs.FS may receive these filesystem metadata operations +// in parallel. To accommodate this parallelism, the diskHealthCheckingFS's +// write-oriented filesystem operations record their start times into a 'slot' +// on the filesystem. A single long-running goroutine periodically scans the +// slots looking for slow operations. +// +// The number of slots on a diskHealthCheckingFS grows to a working set of the +// maximum concurrent filesystem operations. This is expected to be very few +// for these reasons: +// 1. Pebble has limited write concurrency. Flushes, compactions and WAL +// rotations are the primary sources of filesystem metadata operations. With +// the default max-compaction concurrency, these operations require at most 5 +// concurrent slots if all 5 perform a filesystem metadata operation +// simultaneously. +// 2. Pebble's limited concurrent I/O writers spend most of their time +// performing file I/O, not performing the filesystem metadata operations that +// require recording a slot on the diskHealthCheckingFS. +// 3. In CockroachDB, each additional store/Pebble instance has its own vfs.FS +// which provides a separate goroutine and set of slots. +// 4. In CockroachDB, many of the additional sources of filesystem metadata +// operations (like encryption-at-rest) are sequential with respect to Pebble's +// threads. type diskHealthCheckingFS struct { - FS - + tickInterval time.Duration diskSlowThreshold time.Duration onSlowDisk func(string, time.Duration) + fs FS + mu struct { + sync.Mutex + tickerRunning bool + stopper chan struct{} + inflight []*slot + } + // prealloc preallocates the memory for mu.inflight slots and the slice + // itself. The contained fields are not accessed directly except by + // WithDiskHealthChecks when initializing mu.inflight. The number of slots + // in d.mu.inflight will grow to the maximum number of concurrent file + // metadata operations (create, remove, link, etc). If the number of + // concurrent operations never exceeds preallocatedSlotCount, we'll never + // incur an additional allocation. + prealloc struct { + slots [preallocatedSlotCount]slot + slotPtrSlice [preallocatedSlotCount]*slot + } +} + +type slot struct { + name string + startNanos int64 } -// WithDiskHealthChecks wraps an FS and ensures that all -// write-oriented created with that FS are wrapped with disk health detection -// checks. Disk operations that are observed to take longer than -// diskSlowThreshold trigger an onSlowDisk call. +// diskHealthCheckingFS implements FS. +var _ FS = (*diskHealthCheckingFS)(nil) + +// WithDiskHealthChecks wraps an FS and ensures that all write-oriented +// operations on the FS are wrapped with disk health detection checks. Disk +// operations that are observed to take longer than diskSlowThreshold trigger an +// onSlowDisk call. +// +// A threshold of zero disables disk-health checking. func WithDiskHealthChecks( - fs FS, diskSlowThreshold time.Duration, onSlowDisk func(string, time.Duration), -) FS { - return diskHealthCheckingFS{ - FS: fs, + innerFS FS, diskSlowThreshold time.Duration, onSlowDisk func(string, time.Duration), +) (FS, io.Closer) { + if diskSlowThreshold == 0 { + return innerFS, noopCloser{} + } + + fs := &diskHealthCheckingFS{ + fs: innerFS, + tickInterval: defaultTickInterval, diskSlowThreshold: diskSlowThreshold, onSlowDisk: onSlowDisk, } + fs.mu.stopper = make(chan struct{}) + // The fs holds preallocated slots and a preallocated array of slot pointers + // with equal length. Initialize the inflight slice to use a slice backed by + // the preallocated array with each slot initialized to a preallocated slot. + fs.mu.inflight = fs.prealloc.slotPtrSlice[:] + for i := range fs.mu.inflight { + fs.mu.inflight[i] = &fs.prealloc.slots[i] + } + return fs, fs } -// Create implements the vfs.FS interface. -func (d diskHealthCheckingFS) Create(name string) (File, error) { - f, err := d.FS.Create(name) +func (d *diskHealthCheckingFS) timeFilesystemOp(name string, op func()) { + if d == nil { + op() + return + } + + // Record this operation's start time on the FS, so that the long-running + // goroutine can monitor the filesystem operation. + // + // The diskHealthCheckingFile implementation uses a single field that is + // atomically updated, taking advantage of the fact that writes to a single + // vfs.File handle are not performed in parallel. The vfs.FS however may + // receive write filesystem operations in parallel. To accommodate this + // parallelism, writing goroutines append their start time to a + // mutex-protected vector. On ticks, the long-running goroutine scans the + // vector searching for start times older than the slow-disk threshold. When + // a writing goroutine completes its operation, it atomically overwrites its + // slot to signal completion. + var s *slot + func() { + d.mu.Lock() + defer d.mu.Unlock() + + // If there's no long-running goroutine to monitor this filesystem + // operation, start one. + if !d.mu.tickerRunning { + d.startTickerLocked() + } + + startNanos := time.Now().UnixNano() + for i := 0; i < len(d.mu.inflight); i++ { + if atomic.LoadInt64(&d.mu.inflight[i].startNanos) == 0 { + // This slot is not in use. Claim it. + s = d.mu.inflight[i] + s.name = name + atomic.StoreInt64(&s.startNanos, startNanos) + break + } + } + // If we didn't find any unused slots, create a new slot and append it. + // This slot will exist forever. The number of slots will grow to the + // maximum number of concurrent filesystem operations over the lifetime + // of the process. Only operations that grow the number of slots must + // incur an allocation. + if s == nil { + s = &slot{ + name: name, + startNanos: startNanos, + } + d.mu.inflight = append(d.mu.inflight, s) + } + }() + + op() + + // Signal completion by zeroing the start time. + atomic.StoreInt64(&s.startNanos, 0) +} + +// startTickerLocked starts a new goroutine with a ticker to monitor disk +// filesystem operations. Requires d.mu and !d.mu.tickerRunning. +func (d *diskHealthCheckingFS) startTickerLocked() { + d.mu.tickerRunning = true + stopper := d.mu.stopper + go func() { + ticker := time.NewTicker(d.tickInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + // Scan the inflight slots for any slots recording a start + // time older than the diskSlowThreshold. + d.mu.Lock() + now := time.Now() + for i := range d.mu.inflight { + nanos := atomic.LoadInt64(&d.mu.inflight[i].startNanos) + if nanos != 0 && time.Unix(0, nanos).Add(d.diskSlowThreshold).Before(now) { + // diskSlowThreshold was exceeded. Invoke the provided + // callback. + d.onSlowDisk(d.mu.inflight[i].name, now.Sub(time.Unix(0, nanos))) + } + } + d.mu.Unlock() + case <-stopper: + return + } + } + }() +} + +// Close implements io.Closer. Close stops the long-running goroutine that +// monitors for slow filesystem metadata operations. Close may be called +// multiple times. If the filesystem is used after Close has been called, a new +// long-running goroutine will be created. +func (d *diskHealthCheckingFS) Close() error { + d.mu.Lock() + if !d.mu.tickerRunning { + // Nothing to stop. + d.mu.Unlock() + return nil + } + + // Grab the stopper so we can request the long-running goroutine to stop. + // Replace the stopper in case this FS is reused. It's possible to Close and + // reuse a disk-health checking FS. This is to accommodate the on-by-default + // behavior in Pebble, and the possibility that users may continue to use + // the Pebble default FS beyond the lifetime of a single DB. + stopper := d.mu.stopper + d.mu.stopper = make(chan struct{}) + d.mu.tickerRunning = false + d.mu.Unlock() + + // Ask the long-running goroutine to stop. This is a synchronous channel + // send. + stopper <- struct{}{} + close(stopper) + return nil +} + +// Create implements the FS interface. +func (d *diskHealthCheckingFS) Create(name string) (File, error) { + var f File + var err error + d.timeFilesystemOp(name, func() { + f, err = d.fs.Create(name) + }) if err != nil { return f, err } @@ -160,9 +370,108 @@ func (d diskHealthCheckingFS) Create(name string) (File, error) { return WithFd(f, checkingFile), nil } -// ReuseForWrite implements the vfs.FS interface. -func (d diskHealthCheckingFS) ReuseForWrite(oldname, newname string) (File, error) { - f, err := d.FS.ReuseForWrite(oldname, newname) +// GetDiskUsage implements the FS interface. +func (d *diskHealthCheckingFS) GetDiskUsage(path string) (DiskUsage, error) { + return d.fs.GetDiskUsage(path) +} + +// Link implements the FS interface. +func (d *diskHealthCheckingFS) Link(oldname, newname string) error { + var err error + d.timeFilesystemOp(newname, func() { + err = d.fs.Link(oldname, newname) + }) + return err +} + +// List implements the FS interface. +func (d *diskHealthCheckingFS) List(dir string) ([]string, error) { + return d.fs.List(dir) +} + +// Lock implements the FS interface. +func (d *diskHealthCheckingFS) Lock(name string) (io.Closer, error) { + return d.fs.Lock(name) +} + +// MkdirAll implements the FS interface. +func (d *diskHealthCheckingFS) MkdirAll(dir string, perm os.FileMode) error { + var err error + d.timeFilesystemOp(dir, func() { + err = d.fs.MkdirAll(dir, perm) + }) + return err +} + +// Open implements the FS interface. +func (d *diskHealthCheckingFS) Open(name string, opts ...OpenOption) (File, error) { + return d.fs.Open(name, opts...) +} + +// OpenDir implements the FS interface. +func (d *diskHealthCheckingFS) OpenDir(name string) (File, error) { + f, err := d.fs.OpenDir(name) + if err != nil { + return f, err + } + // Directories opened with OpenDir must be opened with health checking, + // because they may be explicitly synced. + checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, func(duration time.Duration) { + d.onSlowDisk(name, duration) + }) + checkingFile.startTicker() + return WithFd(f, checkingFile), nil +} + +// PathBase implements the FS interface. +func (d *diskHealthCheckingFS) PathBase(path string) string { + return d.fs.PathBase(path) +} + +// PathJoin implements the FS interface. +func (d *diskHealthCheckingFS) PathJoin(elem ...string) string { + return d.fs.PathJoin(elem...) +} + +// PathDir implements the FS interface. +func (d *diskHealthCheckingFS) PathDir(path string) string { + return d.fs.PathDir(path) +} + +// Remove implements the FS interface. +func (d *diskHealthCheckingFS) Remove(name string) error { + var err error + d.timeFilesystemOp(name, func() { + err = d.fs.Remove(name) + }) + return err +} + +// RemoveAll implements the FS interface. +func (d *diskHealthCheckingFS) RemoveAll(name string) error { + var err error + d.timeFilesystemOp(name, func() { + err = d.fs.RemoveAll(name) + }) + return err +} + +// Rename implements the FS interface. +func (d *diskHealthCheckingFS) Rename(oldname, newname string) error { + var err error + d.timeFilesystemOp(newname, func() { + err = d.fs.Rename(oldname, newname) + }) + return err +} + +// ReuseForWrite implements the FS interface. +func (d *diskHealthCheckingFS) ReuseForWrite(oldname, newname string) (File, error) { + var f File + var err error + d.timeFilesystemOp(newname, func() { + f, err = d.fs.ReuseForWrite(oldname, newname) + }) if err != nil { return f, err } @@ -175,3 +484,12 @@ func (d diskHealthCheckingFS) ReuseForWrite(oldname, newname string) (File, erro checkingFile.startTicker() return WithFd(f, checkingFile), nil } + +// Stat implements the FS interface. +func (d *diskHealthCheckingFS) Stat(name string) (os.FileInfo, error) { + return d.fs.Stat(name) +} + +type noopCloser struct{} + +func (noopCloser) Close() error { return nil } diff --git a/vfs/disk_health_test.go b/vfs/disk_health_test.go new file mode 100644 index 0000000000..3add516eb9 --- /dev/null +++ b/vfs/disk_health_test.go @@ -0,0 +1,312 @@ +// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package vfs + +import ( + "io" + "os" + "sync/atomic" + "testing" + "time" + + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +type mockFile struct { + syncDuration time.Duration +} + +func (m mockFile) Close() error { + return nil +} + +func (m mockFile) Read(p []byte) (n int, err error) { + panic("unimplemented") +} + +func (m mockFile) ReadAt(p []byte, off int64) (n int, err error) { + panic("unimplemented") +} + +func (m mockFile) Write(p []byte) (n int, err error) { + time.Sleep(m.syncDuration) + return len(p), nil +} + +func (m mockFile) Stat() (os.FileInfo, error) { + panic("unimplemented") +} + +func (m mockFile) Sync() error { + time.Sleep(m.syncDuration) + return nil +} + +var _ File = &mockFile{} + +type mockFS struct { + create func(string) (File, error) + link func(string, string) error + list func(string) ([]string, error) + lock func(string) (io.Closer, error) + mkdirAll func(string, os.FileMode) error + open func(string, ...OpenOption) (File, error) + openDir func(string) (File, error) + pathBase func(string) string + pathJoin func(...string) string + pathDir func(string) string + remove func(string) error + removeAll func(string) error + rename func(string, string) error + reuseForWrite func(string, string) (File, error) + stat func(string) (os.FileInfo, error) + getDiskUsage func(string) (DiskUsage, error) +} + +func (m mockFS) Create(name string) (File, error) { + if m.create == nil { + panic("unimplemented") + } + return m.create(name) +} + +func (m mockFS) Link(oldname, newname string) error { + if m.link == nil { + panic("unimplemented") + } + return m.link(oldname, newname) +} + +func (m mockFS) Open(name string, opts ...OpenOption) (File, error) { + if m.open == nil { + panic("unimplemented") + } + return m.open(name, opts...) +} + +func (m mockFS) OpenDir(name string) (File, error) { + if m.openDir == nil { + panic("unimplemented") + } + return m.openDir(name) +} + +func (m mockFS) Remove(name string) error { + if m.remove == nil { + panic("unimplemented") + } + return m.remove(name) +} + +func (m mockFS) RemoveAll(name string) error { + if m.removeAll == nil { + panic("unimplemented") + } + return m.removeAll(name) +} + +func (m mockFS) Rename(oldname, newname string) error { + if m.rename == nil { + panic("unimplemented") + } + return m.rename(oldname, newname) +} + +func (m mockFS) ReuseForWrite(oldname, newname string) (File, error) { + if m.reuseForWrite == nil { + panic("unimplemented") + } + return m.reuseForWrite(oldname, newname) +} + +func (m mockFS) MkdirAll(dir string, perm os.FileMode) error { + if m.mkdirAll == nil { + panic("unimplemented") + } + return m.mkdirAll(dir, perm) +} + +func (m mockFS) Lock(name string) (io.Closer, error) { + if m.lock == nil { + panic("unimplemented") + } + return m.lock(name) +} + +func (m mockFS) List(dir string) ([]string, error) { + if m.list == nil { + panic("unimplemented") + } + return m.list(dir) +} + +func (m mockFS) Stat(name string) (os.FileInfo, error) { + if m.stat == nil { + panic("unimplemented") + } + return m.stat(name) +} + +func (m mockFS) PathBase(path string) string { + if m.pathBase == nil { + panic("unimplemented") + } + return m.pathBase(path) +} + +func (m mockFS) PathJoin(elem ...string) string { + if m.pathJoin == nil { + panic("unimplemented") + } + return m.pathJoin(elem...) +} + +func (m mockFS) PathDir(path string) string { + if m.pathDir == nil { + panic("unimplemented") + } + return m.pathDir(path) +} + +func (m mockFS) GetDiskUsage(path string) (DiskUsage, error) { + if m.getDiskUsage == nil { + panic("unimplemented") + } + return m.getDiskUsage(path) +} + +var _ FS = &mockFS{} + +func TestDiskHealthChecking_Sync(t *testing.T) { + diskSlow := make(chan time.Duration, 100) + slowThreshold := 1 * time.Second + mockFS := &mockFS{create: func(name string) (File, error) { + return mockFile{syncDuration: 3 * time.Second}, nil + }} + fs, closer := WithDiskHealthChecks(mockFS, slowThreshold, + func(s string, duration time.Duration) { + diskSlow <- duration + }) + defer closer.Close() + dhFile, _ := fs.Create("test") + defer dhFile.Close() + + dhFile.Sync() + + select { + case d := <-diskSlow: + if d.Seconds() < slowThreshold.Seconds() { + t.Fatalf("expected %0.1f to be greater than threshold %0.1f", d.Seconds(), slowThreshold.Seconds()) + } + case <-time.After(5 * time.Second): + t.Fatal("disk stall detector did not detect slow disk operation") + } +} + +var ( + errInjected = errors.New("injected error") +) + +func filesystemOpsMockFS(sleepDur time.Duration) *mockFS { + return &mockFS{ + create: func(name string) (File, error) { + time.Sleep(sleepDur) + return nil, errInjected + }, + link: func(oldname, newname string) error { + time.Sleep(sleepDur) + return errInjected + }, + mkdirAll: func(string, os.FileMode) error { + time.Sleep(sleepDur) + return errInjected + }, + remove: func(name string) error { + time.Sleep(sleepDur) + return errInjected + }, + removeAll: func(name string) error { + time.Sleep(sleepDur) + return errInjected + }, + rename: func(oldname, newname string) error { + time.Sleep(sleepDur) + return errInjected + }, + reuseForWrite: func(oldname, newname string) (File, error) { + time.Sleep(sleepDur) + return nil, errInjected + }, + } +} + +func stallFilesystemOperations(fs FS) map[string]func() { + return map[string]func(){ + "create": func() { _, _ = fs.Create("foo") }, + "link": func() { _ = fs.Link("foo", "bar") }, + "mkdir-all": func() { _ = fs.MkdirAll("foo", os.ModePerm) }, + "remove": func() { _ = fs.Remove("foo") }, + "remove-all": func() { _ = fs.RemoveAll("foo") }, + "rename": func() { _ = fs.Rename("foo", "bar") }, + "reuse-for-write": func() { _, _ = fs.ReuseForWrite("foo", "bar") }, + } +} + +func TestDiskHealthChecking_Filesystem(t *testing.T) { + const sleepDur = 50 * time.Millisecond + const stallThreshold = 10 * time.Millisecond + + // Wrap with disk-health checking, counting each stall on stallCount. + var stallCount uint64 + fs, closer := WithDiskHealthChecks(filesystemOpsMockFS(sleepDur), stallThreshold, + func(name string, dur time.Duration) { + atomic.AddUint64(&stallCount, 1) + }) + defer closer.Close() + fs.(*diskHealthCheckingFS).tickInterval = 5 * time.Millisecond + ops := stallFilesystemOperations(fs) + for name, op := range ops { + t.Run(name, func(t *testing.T) { + before := atomic.LoadUint64(&stallCount) + op() + after := atomic.LoadUint64(&stallCount) + require.Greater(t, int(after-before), 0) + }) + } +} + +// TestDiskHealthChecking_Filesystem_Close tests the behavior of repeatedly +// closing and reusing a filesystem wrapped by WithDiskHealthChecks. This is a +// permitted usage because it allows (*pebble.Options).EnsureDefaults to wrap +// with disk-health checking by default, and to clean up the long-running +// goroutine on (*pebble.DB).Close, while still allowing the FS to be used +// multiple times. +func TestDiskHealthChecking_Filesystem_Close(t *testing.T) { + const stallThreshold = 10 * time.Millisecond + mockFS := &mockFS{ + create: func(name string) (File, error) { + time.Sleep(50 * time.Millisecond) + return &mockFile{}, nil + }, + } + + stalled := map[string]time.Duration{} + fs, closer := WithDiskHealthChecks(mockFS, stallThreshold, + func(name string, dur time.Duration) { stalled[name] = dur }) + fs.(*diskHealthCheckingFS).tickInterval = 5 * time.Millisecond + + files := []string{"foo", "bar", "bax"} + for _, filename := range files { + // Create will stall, and the detector should write to the stalled map + // with the filename. + _, _ = fs.Create(filename) + // Invoke the closer. This will cause the long-running goroutine to + // exit, but the fs should still be usable and should still detect + // subsequent stalls on the next iteration. + require.NoError(t, closer.Close()) + require.Contains(t, stalled, filename) + } +} diff --git a/vfs/disk_heath_test.go b/vfs/disk_heath_test.go deleted file mode 100644 index 15ec57c1ec..0000000000 --- a/vfs/disk_heath_test.go +++ /dev/null @@ -1,136 +0,0 @@ -// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use -// of this source code is governed by a BSD-style license that can be found in -// the LICENSE file. - -package vfs - -import ( - "io" - "os" - "testing" - "time" -) - -type mockFile struct { - syncDuration time.Duration -} - -func (m mockFile) Close() error { - return nil -} - -func (m mockFile) Read(p []byte) (n int, err error) { - panic("unimplemented") -} - -func (m mockFile) ReadAt(p []byte, off int64) (n int, err error) { - panic("unimplemented") -} - -func (m mockFile) Write(p []byte) (n int, err error) { - time.Sleep(m.syncDuration) - return len(p), nil -} - -func (m mockFile) Stat() (os.FileInfo, error) { - panic("unimplemented") -} - -func (m mockFile) Sync() error { - time.Sleep(m.syncDuration) - return nil -} - -var _ File = &mockFile{} - -type mockFS struct { - syncDuration time.Duration -} - -func (m mockFS) Create(name string) (File, error) { - return mockFile(m), nil -} - -func (m mockFS) Link(oldname, newname string) error { - panic("unimplemented") -} - -func (m mockFS) Open(name string, opts ...OpenOption) (File, error) { - panic("unimplemented") -} - -func (m mockFS) OpenDir(name string) (File, error) { - panic("unimplemented") -} - -func (m mockFS) Remove(name string) error { - panic("unimplemented") -} - -func (m mockFS) RemoveAll(name string) error { - panic("unimplemented") -} - -func (m mockFS) Rename(oldname, newname string) error { - panic("unimplemented") -} - -func (m mockFS) ReuseForWrite(oldname, newname string) (File, error) { - return mockFile(m), nil -} - -func (m mockFS) MkdirAll(dir string, perm os.FileMode) error { - panic("unimplemented") -} - -func (m mockFS) Lock(name string) (io.Closer, error) { - panic("unimplemented") -} - -func (m mockFS) List(dir string) ([]string, error) { - panic("unimplemented") -} - -func (m mockFS) Stat(name string) (os.FileInfo, error) { - panic("unimplemented") -} - -func (m mockFS) PathBase(path string) string { - panic("unimplemented") -} - -func (m mockFS) PathJoin(elem ...string) string { - panic("unimplemented") -} - -func (m mockFS) PathDir(path string) string { - panic("unimplemented") -} - -func (m mockFS) GetDiskUsage(path string) (DiskUsage, error) { - panic("unimplemented") -} - -var _ FS = &mockFS{} - -func TestDiskHealthChecking(t *testing.T) { - diskSlow := make(chan time.Duration, 100) - slowThreshold := 1 * time.Second - mockFS := &mockFS{syncDuration: 3 * time.Second} - fs := WithDiskHealthChecks(mockFS, slowThreshold, func(s string, duration time.Duration) { - diskSlow <- duration - }) - dhFile, _ := fs.Create("test") - defer dhFile.Close() - - dhFile.Sync() - - select { - case d := <-diskSlow: - if d.Seconds() < slowThreshold.Seconds() { - t.Fatalf("expected %0.1f to be greater than threshold %0.1f", d.Seconds(), slowThreshold.Seconds()) - } - case <-time.After(5 * time.Second): - t.Fatal("disk stall detector did not detect slow disk operation") - } -} diff --git a/vfs/fd_test.go b/vfs/fd_test.go index 785b1f368b..2a7d5f9c10 100644 --- a/vfs/fd_test.go +++ b/vfs/fd_test.go @@ -22,7 +22,9 @@ func TestFileWrappersHaveFd(t *testing.T) { defer os.Remove(filename) // File wrapper case 1: Check if diskHealthCheckingFile has Fd(). - fs2 := WithDiskHealthChecks(Default, 10*time.Second, func(s string, duration time.Duration) {}) + fs2, closer := WithDiskHealthChecks(Default, 10*time.Second, + func(s string, duration time.Duration) {}) + defer closer.Close() f2, err := fs2.Open(filename) require.NoError(t, err) if _, ok := f2.(fdGetter); !ok { diff --git a/vfs/vfs.go b/vfs/vfs.go index 965bf373df..c816a8aaa0 100644 --- a/vfs/vfs.go +++ b/vfs/vfs.go @@ -18,6 +18,9 @@ import ( // // Typically, it will be an *os.File, but test code may choose to substitute // memory-backed implementations. +// +// Write-oriented operations (Write, Sync) must be called sequentially: At most +// 1 call to Write or Sync may be executed at any given time. type File interface { io.Closer io.Reader