Skip to content

Commit

Permalink
vfs: Deflake TestDiskHealthChecking_File*
Browse files Browse the repository at this point in the history
Previously we were relying on sleeps and timing-based ways of
synchronization between observing a stall in the disk health checking
goroutine and confirming for it in the test code itself. This change
adds a more direct synchronization between the two events through
the use of channels, to deflake both tests. Furthermore,
the TestDiskHealthChecking_Filesystem_Close test was previously
doing a relatively thread-unsafe use of a map, which increased
the chances of a flake.

Also closes diskHealthCheckingFiles created in some Create operations
to prevent goroutine leaks.

Also selectively skips two tests on windows that almost exclusively
just flaked there.

Fixes cockroachdb#1718.
  • Loading branch information
itsbilal committed Jul 12, 2023
1 parent 88bbab5 commit f7c8b53
Showing 1 changed file with 67 additions and 16 deletions.
83 changes: 67 additions & 16 deletions vfs/disk_health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"io"
"math"
"os"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -216,6 +218,10 @@ var _ FS = &mockFS{}
func TestDiskHealthChecking_File(t *testing.T) {
oldTickInterval := defaultTickInterval
defaultTickInterval = time.Millisecond
if runtime.GOOS == "windows" {
t.Skipf("skipped on windows due to unreliable runtimes")
}

defer func() { defaultTickInterval = oldTickInterval }()

const (
Expand Down Expand Up @@ -250,22 +256,22 @@ func TestDiskHealthChecking_File(t *testing.T) {
{
op: OpTypeWrite,
writeSize: 5,
writeDuration: 25 * time.Millisecond,
writeDuration: 5 * time.Millisecond,
fn: func(f File) { f.Write([]byte("uh oh")) },
createWriteDelta: 100 * time.Millisecond,
expectStall: false,
},
{
op: OpTypeSync,
writeSize: 0,
writeDuration: 25 * time.Millisecond,
writeDuration: 5 * time.Millisecond,
fn: func(f File) { f.Sync() },
expectStall: false,
},
}
for _, tc := range testCases {
t.Run(tc.op.String(), func(t *testing.T) {
diskSlow := make(chan DiskSlowInfo, 1)
diskSlow := make(chan DiskSlowInfo, 3)
mockFS := &mockFS{create: func(name string) (File, error) {
return mockFile{syncAndWriteDuration: tc.writeDuration}, nil
}}
Expand All @@ -283,7 +289,7 @@ func TestDiskHealthChecking_File(t *testing.T) {

tc.fn(dhFile)

if tc.expectStall { // no false negatives
if tc.expectStall {
select {
case i := <-diskSlow:
d := i.Duration
Expand All @@ -292,14 +298,14 @@ func TestDiskHealthChecking_File(t *testing.T) {
}
require.Equal(t, tc.writeSize, i.WriteSize)
require.Equal(t, tc.op, i.OpType)
case <-time.After(200 * time.Millisecond):
case <-time.After(10 * time.Second):
t.Fatal("disk stall detector did not detect slow disk operation")
}
} else { // no false positives
} else {
select {
case <-diskSlow:
t.Fatal("disk stall detector detected a slow disk operation")
case <-time.After(200 * time.Millisecond):
case <-time.After(2 * slowThreshold):
return
}
}
Expand Down Expand Up @@ -457,7 +463,12 @@ func filesystemOpsMockFS(sleepDur time.Duration) *mockFS {
func stallFilesystemOperations(fs FS) []filesystemOperation {
return []filesystemOperation{
{
"create", OpTypeCreate, func() { _, _ = fs.Create("foo") },
"create", OpTypeCreate, func() {
f, _ := fs.Create("foo")
if f != nil {
f.Close()
}
},
},
{
"link", OpTypeLink, func() { _ = fs.Link("foo", "bar") },
Expand Down Expand Up @@ -489,15 +500,24 @@ type filesystemOperation struct {
func TestDiskHealthChecking_Filesystem(t *testing.T) {
const sleepDur = 50 * time.Millisecond
const stallThreshold = 10 * time.Millisecond
if runtime.GOOS == "windows" {
t.Skipf("skipped on windows due to unreliable runtimes")
}

// Wrap with disk-health checking, counting each stall via stallCount.
var expectedOpType OpType
var stallCount atomic.Uint64
onStall := make(chan struct{}, 10)
var lastOpType OpType
fs, closer := WithDiskHealthChecks(filesystemOpsMockFS(sleepDur), stallThreshold,
func(info DiskSlowInfo) {
require.Equal(t, 0, info.WriteSize)
require.Equal(t, expectedOpType, info.OpType)
stallCount.Add(1)
if lastOpType != info.OpType {
require.Equal(t, expectedOpType, info.OpType)
lastOpType = info.OpType
onStall <- struct{}{}
}
})
defer closer.Close()
fs.(*diskHealthCheckingFS).tickInterval = 5 * time.Millisecond
Expand All @@ -507,6 +527,11 @@ func TestDiskHealthChecking_Filesystem(t *testing.T) {
expectedOpType = o.opType
before := stallCount.Load()
o.f()
select {
case <-onStall:
case <-time.After(10 * time.Second):
t.Fatal("timed out waiting for stall")
}
after := stallCount.Load()
require.Greater(t, int(after-before), 0)
})
Expand All @@ -521,27 +546,53 @@ func TestDiskHealthChecking_Filesystem(t *testing.T) {
// multiple times.
func TestDiskHealthChecking_Filesystem_Close(t *testing.T) {
const stallThreshold = 10 * time.Millisecond
stallChan := make(chan struct{}, 1)
mockFS := &mockFS{
create: func(name string) (File, error) {
time.Sleep(50 * time.Millisecond)
<-stallChan
return &mockFile{}, nil
},
}

stalled := map[string]time.Duration{}
files := []string{"foo", "bar", "bax"}
var lastPath string
stalled := make(chan string)
fs, closer := WithDiskHealthChecks(mockFS, stallThreshold,
func(info DiskSlowInfo) { stalled[info.Path] = info.Duration })
func(info DiskSlowInfo) {
if lastPath != info.Path {
lastPath = info.Path
stalled <- info.Path
}
})
fs.(*diskHealthCheckingFS).tickInterval = 5 * time.Millisecond

files := []string{"foo", "bar", "bax"}
var wg sync.WaitGroup
for _, filename := range files {
// Create will stall, and the detector should write to the stalled map
filename := filename
// Create will stall, and the detector should write to the stalled channel
// with the filename.
_, _ = fs.Create(filename)
wg.Add(1)
go func() {
defer wg.Done()
f, _ := fs.Create(filename)
if f != nil {
f.Close()
}
}()

select {
case stalledPath := <-stalled:
require.Equal(t, filename, stalledPath)
case <-time.After(10 * time.Second):
t.Fatalf("timed out waiting for stall")
}
// Unblock the call to Create().
stallChan <- struct{}{}

// Invoke the closer. This will cause the long-running goroutine to
// exit, but the fs should still be usable and should still detect
// subsequent stalls on the next iteration.
require.NoError(t, closer.Close())
require.Contains(t, stalled, filename)
}
wg.Wait()
}

0 comments on commit f7c8b53

Please sign in to comment.