Skip to content

Commit

Permalink
vfs: Deflake TestDiskHealthChecking_File*
Browse files Browse the repository at this point in the history
Previously we were relying on sleeps and timing-based ways of
synchronization between observing a stall in the disk health checking
goroutine and confirming for it in the test code itself. This change
adds a more direct synchronization between the two events through
the use of channels, to deflake both tests. Furthermore,
the TestDiskHealthChecking_Filesystem_Close test was previously
doing a relatively thread-unsafe use of a map, which increased
the chances of a flake.

Also closes diskHealthCheckingFiles created in some Create operations
to prevent goroutine leaks.

Removes some tests that tested for false positives on disk stalls,
even though scheduler delays can also cause perceived disk
stalls at the small thresholds that were used.

Also selectively skips two tests on windows that almost exclusively
just flaked there.

Fixes #1718.
  • Loading branch information
itsbilal committed Jul 12, 2023
1 parent 88bbab5 commit 6458b11
Showing 1 changed file with 70 additions and 48 deletions.
118 changes: 70 additions & 48 deletions vfs/disk_health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"io"
"math"
"os"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -216,6 +218,10 @@ var _ FS = &mockFS{}
func TestDiskHealthChecking_File(t *testing.T) {
oldTickInterval := defaultTickInterval
defaultTickInterval = time.Millisecond
if runtime.GOOS == "windows" {
t.Skipf("skipped on windows due to unreliable runtimes")
}

defer func() { defaultTickInterval = oldTickInterval }()

const (
Expand All @@ -229,43 +235,23 @@ func TestDiskHealthChecking_File(t *testing.T) {
writeDuration time.Duration
fn func(f File)
createWriteDelta time.Duration
expectStall bool
}{
// No false negatives.
{
op: OpTypeWrite,
writeSize: 5 * writeSizePrecision, // five KB
writeDuration: 100 * time.Millisecond,
fn: func(f File) { f.Write(fiveKB) },
expectStall: true,
},
{
op: OpTypeSync,
writeSize: 0,
writeDuration: 100 * time.Millisecond,
fn: func(f File) { f.Sync() },
expectStall: true,
},
// No false positives.
{
op: OpTypeWrite,
writeSize: 5,
writeDuration: 25 * time.Millisecond,
fn: func(f File) { f.Write([]byte("uh oh")) },
createWriteDelta: 100 * time.Millisecond,
expectStall: false,
},
{
op: OpTypeSync,
writeSize: 0,
writeDuration: 25 * time.Millisecond,
fn: func(f File) { f.Sync() },
expectStall: false,
},
}
for _, tc := range testCases {
t.Run(tc.op.String(), func(t *testing.T) {
diskSlow := make(chan DiskSlowInfo, 1)
diskSlow := make(chan DiskSlowInfo, 3)
mockFS := &mockFS{create: func(name string) (File, error) {
return mockFile{syncAndWriteDuration: tc.writeDuration}, nil
}}
Expand All @@ -283,25 +269,16 @@ func TestDiskHealthChecking_File(t *testing.T) {

tc.fn(dhFile)

if tc.expectStall { // no false negatives
select {
case i := <-diskSlow:
d := i.Duration
if d.Seconds() < slowThreshold.Seconds() {
t.Fatalf("expected %0.1f to be greater than threshold %0.1f", d.Seconds(), slowThreshold.Seconds())
}
require.Equal(t, tc.writeSize, i.WriteSize)
require.Equal(t, tc.op, i.OpType)
case <-time.After(200 * time.Millisecond):
t.Fatal("disk stall detector did not detect slow disk operation")
}
} else { // no false positives
select {
case <-diskSlow:
t.Fatal("disk stall detector detected a slow disk operation")
case <-time.After(200 * time.Millisecond):
return
select {
case i := <-diskSlow:
d := i.Duration
if d.Seconds() < slowThreshold.Seconds() {
t.Fatalf("expected %0.1f to be greater than threshold %0.1f", d.Seconds(), slowThreshold.Seconds())
}
require.Equal(t, tc.writeSize, i.WriteSize)
require.Equal(t, tc.op, i.OpType)
case <-time.After(10 * time.Second):
t.Fatal("disk stall detector did not detect slow disk operation")
}
})
}
Expand Down Expand Up @@ -457,7 +434,12 @@ func filesystemOpsMockFS(sleepDur time.Duration) *mockFS {
func stallFilesystemOperations(fs FS) []filesystemOperation {
return []filesystemOperation{
{
"create", OpTypeCreate, func() { _, _ = fs.Create("foo") },
"create", OpTypeCreate, func() {
f, _ := fs.Create("foo")
if f != nil {
f.Close()
}
},
},
{
"link", OpTypeLink, func() { _ = fs.Link("foo", "bar") },
Expand Down Expand Up @@ -489,15 +471,24 @@ type filesystemOperation struct {
func TestDiskHealthChecking_Filesystem(t *testing.T) {
const sleepDur = 50 * time.Millisecond
const stallThreshold = 10 * time.Millisecond
if runtime.GOOS == "windows" {
t.Skipf("skipped on windows due to unreliable runtimes")
}

// Wrap with disk-health checking, counting each stall via stallCount.
var expectedOpType OpType
var stallCount atomic.Uint64
onStall := make(chan struct{}, 10)
var lastOpType OpType
fs, closer := WithDiskHealthChecks(filesystemOpsMockFS(sleepDur), stallThreshold,
func(info DiskSlowInfo) {
require.Equal(t, 0, info.WriteSize)
require.Equal(t, expectedOpType, info.OpType)
stallCount.Add(1)
if lastOpType != info.OpType {
require.Equal(t, expectedOpType, info.OpType)
lastOpType = info.OpType
onStall <- struct{}{}
}
})
defer closer.Close()
fs.(*diskHealthCheckingFS).tickInterval = 5 * time.Millisecond
Expand All @@ -507,6 +498,11 @@ func TestDiskHealthChecking_Filesystem(t *testing.T) {
expectedOpType = o.opType
before := stallCount.Load()
o.f()
select {
case <-onStall:
case <-time.After(10 * time.Second):
t.Fatal("timed out waiting for stall")
}
after := stallCount.Load()
require.Greater(t, int(after-before), 0)
})
Expand All @@ -521,27 +517,53 @@ func TestDiskHealthChecking_Filesystem(t *testing.T) {
// multiple times.
func TestDiskHealthChecking_Filesystem_Close(t *testing.T) {
const stallThreshold = 10 * time.Millisecond
stallChan := make(chan struct{}, 1)
mockFS := &mockFS{
create: func(name string) (File, error) {
time.Sleep(50 * time.Millisecond)
<-stallChan
return &mockFile{}, nil
},
}

stalled := map[string]time.Duration{}
files := []string{"foo", "bar", "bax"}
var lastPath string
stalled := make(chan string)
fs, closer := WithDiskHealthChecks(mockFS, stallThreshold,
func(info DiskSlowInfo) { stalled[info.Path] = info.Duration })
func(info DiskSlowInfo) {
if lastPath != info.Path {
lastPath = info.Path
stalled <- info.Path
}
})
fs.(*diskHealthCheckingFS).tickInterval = 5 * time.Millisecond

files := []string{"foo", "bar", "bax"}
var wg sync.WaitGroup
for _, filename := range files {
// Create will stall, and the detector should write to the stalled map
filename := filename
// Create will stall, and the detector should write to the stalled channel
// with the filename.
_, _ = fs.Create(filename)
wg.Add(1)
go func() {
defer wg.Done()
f, _ := fs.Create(filename)
if f != nil {
f.Close()
}
}()

select {
case stalledPath := <-stalled:
require.Equal(t, filename, stalledPath)
case <-time.After(10 * time.Second):
t.Fatalf("timed out waiting for stall")
}
// Unblock the call to Create().
stallChan <- struct{}{}

// Invoke the closer. This will cause the long-running goroutine to
// exit, but the fs should still be usable and should still detect
// subsequent stalls on the next iteration.
require.NoError(t, closer.Close())
require.Contains(t, stalled, filename)
}
wg.Wait()
}

0 comments on commit 6458b11

Please sign in to comment.