diff --git a/vfs/disk_health_test.go b/vfs/disk_health_test.go index a3246ad062..e2b6370b18 100644 --- a/vfs/disk_health_test.go +++ b/vfs/disk_health_test.go @@ -8,6 +8,7 @@ import ( "io" "math" "os" + "sync" "sync/atomic" "testing" "time" @@ -231,7 +232,6 @@ func TestDiskHealthChecking_File(t *testing.T) { createWriteDelta time.Duration expectStall bool }{ - // No false negatives. { op: OpTypeWrite, writeSize: 5 * writeSizePrecision, // five KB @@ -246,26 +246,10 @@ func TestDiskHealthChecking_File(t *testing.T) { fn: func(f File) { f.Sync() }, expectStall: true, }, - // No false positives. - { - op: OpTypeWrite, - writeSize: 5, - writeDuration: 25 * time.Millisecond, - fn: func(f File) { f.Write([]byte("uh oh")) }, - createWriteDelta: 100 * time.Millisecond, - expectStall: false, - }, - { - op: OpTypeSync, - writeSize: 0, - writeDuration: 25 * time.Millisecond, - fn: func(f File) { f.Sync() }, - expectStall: false, - }, } for _, tc := range testCases { t.Run(tc.op.String(), func(t *testing.T) { - diskSlow := make(chan DiskSlowInfo, 1) + diskSlow := make(chan DiskSlowInfo, 3) mockFS := &mockFS{create: func(name string) (File, error) { return mockFile{syncAndWriteDuration: tc.writeDuration}, nil }} @@ -283,25 +267,16 @@ func TestDiskHealthChecking_File(t *testing.T) { tc.fn(dhFile) - if tc.expectStall { // no false negatives - select { - case i := <-diskSlow: - d := i.Duration - if d.Seconds() < slowThreshold.Seconds() { - t.Fatalf("expected %0.1f to be greater than threshold %0.1f", d.Seconds(), slowThreshold.Seconds()) - } - require.Equal(t, tc.writeSize, i.WriteSize) - require.Equal(t, tc.op, i.OpType) - case <-time.After(200 * time.Millisecond): - t.Fatal("disk stall detector did not detect slow disk operation") - } - } else { // no false positives - select { - case <-diskSlow: - t.Fatal("disk stall detector detected a slow disk operation") - case <-time.After(200 * time.Millisecond): - return + select { + case i := <-diskSlow: + d := i.Duration + if d.Seconds() < slowThreshold.Seconds() { + t.Fatalf("expected %0.1f to be greater than threshold %0.1f", d.Seconds(), slowThreshold.Seconds()) } + require.Equal(t, tc.writeSize, i.WriteSize) + require.Equal(t, tc.op, i.OpType) + case <-time.After(10 * time.Second): + t.Fatal("disk stall detector did not detect slow disk operation") } }) } @@ -457,7 +432,12 @@ func filesystemOpsMockFS(sleepDur time.Duration) *mockFS { func stallFilesystemOperations(fs FS) []filesystemOperation { return []filesystemOperation{ { - "create", OpTypeCreate, func() { _, _ = fs.Create("foo") }, + "create", OpTypeCreate, func() { + f, _ := fs.Create("foo") + if f != nil { + f.Close() + } + }, }, { "link", OpTypeLink, func() { _ = fs.Link("foo", "bar") }, @@ -493,11 +473,17 @@ func TestDiskHealthChecking_Filesystem(t *testing.T) { // Wrap with disk-health checking, counting each stall via stallCount. var expectedOpType OpType var stallCount atomic.Uint64 + onStall := make(chan struct{}, 10) + var lastOpType OpType fs, closer := WithDiskHealthChecks(filesystemOpsMockFS(sleepDur), stallThreshold, func(info DiskSlowInfo) { require.Equal(t, 0, info.WriteSize) require.Equal(t, expectedOpType, info.OpType) stallCount.Add(1) + if lastOpType != info.OpType { + lastOpType = info.OpType + onStall <- struct{}{} + } }) defer closer.Close() fs.(*diskHealthCheckingFS).tickInterval = 5 * time.Millisecond @@ -507,6 +493,11 @@ func TestDiskHealthChecking_Filesystem(t *testing.T) { expectedOpType = o.opType before := stallCount.Load() o.f() + select { + case <-onStall: + case <-time.After(10 * time.Second): + t.Fatal("timed out waiting for stall") + } after := stallCount.Load() require.Greater(t, int(after-before), 0) }) @@ -521,27 +512,53 @@ func TestDiskHealthChecking_Filesystem(t *testing.T) { // multiple times. func TestDiskHealthChecking_Filesystem_Close(t *testing.T) { const stallThreshold = 10 * time.Millisecond + stallChan := make(chan struct{}, 1) mockFS := &mockFS{ create: func(name string) (File, error) { - time.Sleep(50 * time.Millisecond) + <-stallChan return &mockFile{}, nil }, } - stalled := map[string]time.Duration{} + files := []string{"foo", "bar", "bax"} + var lastPath string + stalled := make(chan string) fs, closer := WithDiskHealthChecks(mockFS, stallThreshold, - func(info DiskSlowInfo) { stalled[info.Path] = info.Duration }) + func(info DiskSlowInfo) { + if lastPath != info.Path { + lastPath = info.Path + stalled <- info.Path + } + }) fs.(*diskHealthCheckingFS).tickInterval = 5 * time.Millisecond - files := []string{"foo", "bar", "bax"} + var wg sync.WaitGroup for _, filename := range files { - // Create will stall, and the detector should write to the stalled map + filename := filename + // Create will stall, and the detector should write to the stalled channel // with the filename. - _, _ = fs.Create(filename) + wg.Add(1) + go func() { + defer wg.Done() + f, _ := fs.Create(filename) + if f != nil { + f.Close() + } + }() + + select { + case stalledPath := <-stalled: + require.Equal(t, filename, stalledPath) + case <-time.After(10 * time.Second): + t.Fatalf("timed out waiting for stall") + } + // Unblock the call to Create(). + stallChan <- struct{}{} + // Invoke the closer. This will cause the long-running goroutine to // exit, but the fs should still be usable and should still detect // subsequent stalls on the next iteration. require.NoError(t, closer.Close()) - require.Contains(t, stalled, filename) } + wg.Wait() }