Skip to content

Commit

Permalink
vfs: Deflake TestDiskHealthChecking_File*
Browse files Browse the repository at this point in the history
Previously we were relying on sleeps and timing-based ways of
synchronization between observing a stall in the disk health checking
goroutine and confirming for it in the test code itself. This change
adds a more direct synchronization between the two events through
the use of channels, to deflake both tests. Furthermore,
the TestDiskHealthChecking_Filesystem_Close test was previously
doing a relatively thread-unsafe use of a map, which increased
the chances of a flake.

Also closes diskHealthCheckingFiles created in some Create operations
to prevent goroutine leaks.

Also deletes TestDiskHealthChecking_File_* variants that tested
for lack of false positives, as given the tiny stall detection
thresholds in the test, spurious stalls are bound to happen on
busy nodes.

Fixes cockroachdb#1718.
  • Loading branch information
itsbilal committed Jul 12, 2023
1 parent 88bbab5 commit 4733dd1
Showing 1 changed file with 61 additions and 44 deletions.
105 changes: 61 additions & 44 deletions vfs/disk_health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io"
"math"
"os"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -231,7 +232,6 @@ func TestDiskHealthChecking_File(t *testing.T) {
createWriteDelta time.Duration
expectStall bool
}{
// No false negatives.
{
op: OpTypeWrite,
writeSize: 5 * writeSizePrecision, // five KB
Expand All @@ -246,26 +246,10 @@ func TestDiskHealthChecking_File(t *testing.T) {
fn: func(f File) { f.Sync() },
expectStall: true,
},
// No false positives.
{
op: OpTypeWrite,
writeSize: 5,
writeDuration: 25 * time.Millisecond,
fn: func(f File) { f.Write([]byte("uh oh")) },
createWriteDelta: 100 * time.Millisecond,
expectStall: false,
},
{
op: OpTypeSync,
writeSize: 0,
writeDuration: 25 * time.Millisecond,
fn: func(f File) { f.Sync() },
expectStall: false,
},
}
for _, tc := range testCases {
t.Run(tc.op.String(), func(t *testing.T) {
diskSlow := make(chan DiskSlowInfo, 1)
diskSlow := make(chan DiskSlowInfo, 3)
mockFS := &mockFS{create: func(name string) (File, error) {
return mockFile{syncAndWriteDuration: tc.writeDuration}, nil
}}
Expand All @@ -283,25 +267,16 @@ func TestDiskHealthChecking_File(t *testing.T) {

tc.fn(dhFile)

if tc.expectStall { // no false negatives
select {
case i := <-diskSlow:
d := i.Duration
if d.Seconds() < slowThreshold.Seconds() {
t.Fatalf("expected %0.1f to be greater than threshold %0.1f", d.Seconds(), slowThreshold.Seconds())
}
require.Equal(t, tc.writeSize, i.WriteSize)
require.Equal(t, tc.op, i.OpType)
case <-time.After(200 * time.Millisecond):
t.Fatal("disk stall detector did not detect slow disk operation")
}
} else { // no false positives
select {
case <-diskSlow:
t.Fatal("disk stall detector detected a slow disk operation")
case <-time.After(200 * time.Millisecond):
return
select {
case i := <-diskSlow:
d := i.Duration
if d.Seconds() < slowThreshold.Seconds() {
t.Fatalf("expected %0.1f to be greater than threshold %0.1f", d.Seconds(), slowThreshold.Seconds())
}
require.Equal(t, tc.writeSize, i.WriteSize)
require.Equal(t, tc.op, i.OpType)
case <-time.After(10 * time.Second):
t.Fatal("disk stall detector did not detect slow disk operation")
}
})
}
Expand Down Expand Up @@ -457,7 +432,12 @@ func filesystemOpsMockFS(sleepDur time.Duration) *mockFS {
func stallFilesystemOperations(fs FS) []filesystemOperation {
return []filesystemOperation{
{
"create", OpTypeCreate, func() { _, _ = fs.Create("foo") },
"create", OpTypeCreate, func() {
f, _ := fs.Create("foo")
if f != nil {
f.Close()
}
},
},
{
"link", OpTypeLink, func() { _ = fs.Link("foo", "bar") },
Expand Down Expand Up @@ -493,11 +473,17 @@ func TestDiskHealthChecking_Filesystem(t *testing.T) {
// Wrap with disk-health checking, counting each stall via stallCount.
var expectedOpType OpType
var stallCount atomic.Uint64
onStall := make(chan struct{}, 10)
var lastOpType OpType
fs, closer := WithDiskHealthChecks(filesystemOpsMockFS(sleepDur), stallThreshold,
func(info DiskSlowInfo) {
require.Equal(t, 0, info.WriteSize)
require.Equal(t, expectedOpType, info.OpType)
stallCount.Add(1)
if lastOpType != info.OpType {
lastOpType = info.OpType
onStall <- struct{}{}
}
})
defer closer.Close()
fs.(*diskHealthCheckingFS).tickInterval = 5 * time.Millisecond
Expand All @@ -507,6 +493,11 @@ func TestDiskHealthChecking_Filesystem(t *testing.T) {
expectedOpType = o.opType
before := stallCount.Load()
o.f()
select {
case <-onStall:
case <-time.After(10 * time.Second):
t.Fatal("timed out waiting for stall")
}
after := stallCount.Load()
require.Greater(t, int(after-before), 0)
})
Expand All @@ -521,27 +512,53 @@ func TestDiskHealthChecking_Filesystem(t *testing.T) {
// multiple times.
func TestDiskHealthChecking_Filesystem_Close(t *testing.T) {
const stallThreshold = 10 * time.Millisecond
stallChan := make(chan struct{}, 1)
mockFS := &mockFS{
create: func(name string) (File, error) {
time.Sleep(50 * time.Millisecond)
<-stallChan
return &mockFile{}, nil
},
}

stalled := map[string]time.Duration{}
files := []string{"foo", "bar", "bax"}
var lastPath string
stalled := make(chan string)
fs, closer := WithDiskHealthChecks(mockFS, stallThreshold,
func(info DiskSlowInfo) { stalled[info.Path] = info.Duration })
func(info DiskSlowInfo) {
if lastPath != info.Path {
lastPath = info.Path
stalled <- info.Path
}
})
fs.(*diskHealthCheckingFS).tickInterval = 5 * time.Millisecond

files := []string{"foo", "bar", "bax"}
var wg sync.WaitGroup
for _, filename := range files {
// Create will stall, and the detector should write to the stalled map
filename := filename
// Create will stall, and the detector should write to the stalled channel
// with the filename.
_, _ = fs.Create(filename)
wg.Add(1)
go func() {
defer wg.Done()
f, _ := fs.Create(filename)
if f != nil {
f.Close()
}
}()

select {
case stalledPath := <-stalled:
require.Equal(t, filename, stalledPath)
case <-time.After(10 * time.Second):
t.Fatalf("timed out waiting for stall")
}
// Unblock the call to Create().
stallChan <- struct{}{}

// Invoke the closer. This will cause the long-running goroutine to
// exit, but the fs should still be usable and should still detect
// subsequent stalls on the next iteration.
require.NoError(t, closer.Close())
require.Contains(t, stalled, filename)
}
wg.Wait()
}

0 comments on commit 4733dd1

Please sign in to comment.