Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vfs: Deflake TestDiskHealthChecking_File* #2734

Merged
merged 1 commit into from
Jul 12, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 70 additions & 48 deletions vfs/disk_health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"io"
"math"
"os"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -216,6 +218,10 @@ var _ FS = &mockFS{}
func TestDiskHealthChecking_File(t *testing.T) {
oldTickInterval := defaultTickInterval
defaultTickInterval = time.Millisecond
if runtime.GOOS == "windows" {
t.Skipf("skipped on windows due to unreliable runtimes")
}

defer func() { defaultTickInterval = oldTickInterval }()

const (
Expand All @@ -229,43 +235,23 @@ func TestDiskHealthChecking_File(t *testing.T) {
writeDuration time.Duration
fn func(f File)
createWriteDelta time.Duration
expectStall bool
}{
// No false negatives.
{
op: OpTypeWrite,
writeSize: 5 * writeSizePrecision, // five KB
writeDuration: 100 * time.Millisecond,
fn: func(f File) { f.Write(fiveKB) },
expectStall: true,
},
{
op: OpTypeSync,
writeSize: 0,
writeDuration: 100 * time.Millisecond,
fn: func(f File) { f.Sync() },
expectStall: true,
},
// No false positives.
{
op: OpTypeWrite,
writeSize: 5,
writeDuration: 25 * time.Millisecond,
fn: func(f File) { f.Write([]byte("uh oh")) },
createWriteDelta: 100 * time.Millisecond,
expectStall: false,
},
{
op: OpTypeSync,
writeSize: 0,
writeDuration: 25 * time.Millisecond,
fn: func(f File) { f.Sync() },
expectStall: false,
},
}
for _, tc := range testCases {
t.Run(tc.op.String(), func(t *testing.T) {
diskSlow := make(chan DiskSlowInfo, 1)
diskSlow := make(chan DiskSlowInfo, 3)
mockFS := &mockFS{create: func(name string) (File, error) {
return mockFile{syncAndWriteDuration: tc.writeDuration}, nil
}}
Expand All @@ -283,25 +269,16 @@ func TestDiskHealthChecking_File(t *testing.T) {

tc.fn(dhFile)

if tc.expectStall { // no false negatives
select {
case i := <-diskSlow:
d := i.Duration
if d.Seconds() < slowThreshold.Seconds() {
t.Fatalf("expected %0.1f to be greater than threshold %0.1f", d.Seconds(), slowThreshold.Seconds())
}
require.Equal(t, tc.writeSize, i.WriteSize)
require.Equal(t, tc.op, i.OpType)
case <-time.After(200 * time.Millisecond):
t.Fatal("disk stall detector did not detect slow disk operation")
}
} else { // no false positives
select {
case <-diskSlow:
t.Fatal("disk stall detector detected a slow disk operation")
case <-time.After(200 * time.Millisecond):
return
select {
case i := <-diskSlow:
d := i.Duration
if d.Seconds() < slowThreshold.Seconds() {
t.Fatalf("expected %0.1f to be greater than threshold %0.1f", d.Seconds(), slowThreshold.Seconds())
}
require.Equal(t, tc.writeSize, i.WriteSize)
require.Equal(t, tc.op, i.OpType)
case <-time.After(10 * time.Second):
t.Fatal("disk stall detector did not detect slow disk operation")
}
})
}
Expand Down Expand Up @@ -457,7 +434,12 @@ func filesystemOpsMockFS(sleepDur time.Duration) *mockFS {
func stallFilesystemOperations(fs FS) []filesystemOperation {
return []filesystemOperation{
{
"create", OpTypeCreate, func() { _, _ = fs.Create("foo") },
"create", OpTypeCreate, func() {
f, _ := fs.Create("foo")
if f != nil {
f.Close()
}
},
},
{
"link", OpTypeLink, func() { _ = fs.Link("foo", "bar") },
Expand Down Expand Up @@ -489,15 +471,24 @@ type filesystemOperation struct {
func TestDiskHealthChecking_Filesystem(t *testing.T) {
const sleepDur = 50 * time.Millisecond
const stallThreshold = 10 * time.Millisecond
if runtime.GOOS == "windows" {
t.Skipf("skipped on windows due to unreliable runtimes")
}

// Wrap with disk-health checking, counting each stall via stallCount.
var expectedOpType OpType
var stallCount atomic.Uint64
onStall := make(chan struct{}, 10)
var lastOpType OpType
fs, closer := WithDiskHealthChecks(filesystemOpsMockFS(sleepDur), stallThreshold,
func(info DiskSlowInfo) {
require.Equal(t, 0, info.WriteSize)
require.Equal(t, expectedOpType, info.OpType)
stallCount.Add(1)
if lastOpType != info.OpType {
require.Equal(t, expectedOpType, info.OpType)
lastOpType = info.OpType
onStall <- struct{}{}
}
})
defer closer.Close()
fs.(*diskHealthCheckingFS).tickInterval = 5 * time.Millisecond
Expand All @@ -507,6 +498,11 @@ func TestDiskHealthChecking_Filesystem(t *testing.T) {
expectedOpType = o.opType
before := stallCount.Load()
o.f()
select {
case <-onStall:
case <-time.After(10 * time.Second):
t.Fatal("timed out waiting for stall")
}
after := stallCount.Load()
require.Greater(t, int(after-before), 0)
})
Expand All @@ -521,27 +517,53 @@ func TestDiskHealthChecking_Filesystem(t *testing.T) {
// multiple times.
func TestDiskHealthChecking_Filesystem_Close(t *testing.T) {
const stallThreshold = 10 * time.Millisecond
stallChan := make(chan struct{}, 1)
mockFS := &mockFS{
create: func(name string) (File, error) {
time.Sleep(50 * time.Millisecond)
<-stallChan
return &mockFile{}, nil
},
}

stalled := map[string]time.Duration{}
files := []string{"foo", "bar", "bax"}
var lastPath string
stalled := make(chan string)
fs, closer := WithDiskHealthChecks(mockFS, stallThreshold,
func(info DiskSlowInfo) { stalled[info.Path] = info.Duration })
func(info DiskSlowInfo) {
if lastPath != info.Path {
lastPath = info.Path
stalled <- info.Path
}
})
fs.(*diskHealthCheckingFS).tickInterval = 5 * time.Millisecond

files := []string{"foo", "bar", "bax"}
var wg sync.WaitGroup
for _, filename := range files {
// Create will stall, and the detector should write to the stalled map
filename := filename
// Create will stall, and the detector should write to the stalled channel
// with the filename.
_, _ = fs.Create(filename)
wg.Add(1)
go func() {
defer wg.Done()
f, _ := fs.Create(filename)
if f != nil {
f.Close()
}
}()

select {
case stalledPath := <-stalled:
require.Equal(t, filename, stalledPath)
case <-time.After(10 * time.Second):
t.Fatalf("timed out waiting for stall")
}
// Unblock the call to Create().
stallChan <- struct{}{}

// Invoke the closer. This will cause the long-running goroutine to
// exit, but the fs should still be usable and should still detect
// subsequent stalls on the next iteration.
require.NoError(t, closer.Close())
require.Contains(t, stalled, filename)
}
wg.Wait()
}