From cfa1e77e192c4e49a6f47b88459ccaf8a365cc9e Mon Sep 17 00:00:00 2001 From: Bilal Akhtar Date: Wed, 12 Jul 2023 11:02:17 -0400 Subject: [PATCH] vfs: Deflake TestDiskHealthChecking_File* Previously we were relying on sleeps and timing-based ways of synchronization between observing a stall in the disk health checking goroutine and confirming for it in the test code itself. This change adds a more direct synchronization between the two events through the use of channels, to deflake both tests. Furthermore, the TestDiskHealthChecking_Filesystem_Close test was previously doing a relatively thread-unsafe use of a map, which increased the chances of a flake. Also closes diskHealthCheckingFiles created in some Create operations to prevent goroutine leaks. Fixes #1718. --- vfs/disk_health_test.go | 73 ++++++++++++++++++++++++++++++++--------- 1 file changed, 58 insertions(+), 15 deletions(-) diff --git a/vfs/disk_health_test.go b/vfs/disk_health_test.go index a3246ad062..55243600de 100644 --- a/vfs/disk_health_test.go +++ b/vfs/disk_health_test.go @@ -8,6 +8,7 @@ import ( "io" "math" "os" + "sync" "sync/atomic" "testing" "time" @@ -250,7 +251,7 @@ func TestDiskHealthChecking_File(t *testing.T) { { op: OpTypeWrite, writeSize: 5, - writeDuration: 25 * time.Millisecond, + writeDuration: 5 * time.Millisecond, fn: func(f File) { f.Write([]byte("uh oh")) }, createWriteDelta: 100 * time.Millisecond, expectStall: false, @@ -258,14 +259,14 @@ func TestDiskHealthChecking_File(t *testing.T) { { op: OpTypeSync, writeSize: 0, - writeDuration: 25 * time.Millisecond, + writeDuration: 5 * time.Millisecond, fn: func(f File) { f.Sync() }, expectStall: false, }, } for _, tc := range testCases { t.Run(tc.op.String(), func(t *testing.T) { - diskSlow := make(chan DiskSlowInfo, 1) + diskSlow := make(chan DiskSlowInfo, 3) mockFS := &mockFS{create: func(name string) (File, error) { return mockFile{syncAndWriteDuration: tc.writeDuration}, nil }} @@ -283,7 +284,7 @@ func TestDiskHealthChecking_File(t *testing.T) { tc.fn(dhFile) - if tc.expectStall { // no false negatives + if tc.expectStall { select { case i := <-diskSlow: d := i.Duration @@ -292,14 +293,14 @@ func TestDiskHealthChecking_File(t *testing.T) { } require.Equal(t, tc.writeSize, i.WriteSize) require.Equal(t, tc.op, i.OpType) - case <-time.After(200 * time.Millisecond): + case <-time.After(10 * time.Second): t.Fatal("disk stall detector did not detect slow disk operation") } - } else { // no false positives + } else { select { case <-diskSlow: t.Fatal("disk stall detector detected a slow disk operation") - case <-time.After(200 * time.Millisecond): + case <-time.After(2 * slowThreshold): return } } @@ -457,7 +458,12 @@ func filesystemOpsMockFS(sleepDur time.Duration) *mockFS { func stallFilesystemOperations(fs FS) []filesystemOperation { return []filesystemOperation{ { - "create", OpTypeCreate, func() { _, _ = fs.Create("foo") }, + "create", OpTypeCreate, func() { + f, _ := fs.Create("foo") + if f != nil { + f.Close() + } + }, }, { "link", OpTypeLink, func() { _ = fs.Link("foo", "bar") }, @@ -493,11 +499,17 @@ func TestDiskHealthChecking_Filesystem(t *testing.T) { // Wrap with disk-health checking, counting each stall via stallCount. var expectedOpType OpType var stallCount atomic.Uint64 + onStall := make(chan struct{}, 10) + var lastOpType OpType fs, closer := WithDiskHealthChecks(filesystemOpsMockFS(sleepDur), stallThreshold, func(info DiskSlowInfo) { require.Equal(t, 0, info.WriteSize) require.Equal(t, expectedOpType, info.OpType) stallCount.Add(1) + if lastOpType != info.OpType { + lastOpType = info.OpType + onStall <- struct{}{} + } }) defer closer.Close() fs.(*diskHealthCheckingFS).tickInterval = 5 * time.Millisecond @@ -507,6 +519,11 @@ func TestDiskHealthChecking_Filesystem(t *testing.T) { expectedOpType = o.opType before := stallCount.Load() o.f() + select { + case <-onStall: + case <-time.After(10 * time.Second): + t.Fatal("timed out waiting for stall") + } after := stallCount.Load() require.Greater(t, int(after-before), 0) }) @@ -521,27 +538,53 @@ func TestDiskHealthChecking_Filesystem(t *testing.T) { // multiple times. func TestDiskHealthChecking_Filesystem_Close(t *testing.T) { const stallThreshold = 10 * time.Millisecond + stallChan := make(chan struct{}, 1) mockFS := &mockFS{ create: func(name string) (File, error) { - time.Sleep(50 * time.Millisecond) + <-stallChan return &mockFile{}, nil }, } - stalled := map[string]time.Duration{} + files := []string{"foo", "bar", "bax"} + var lastPath string + stalled := make(chan string) fs, closer := WithDiskHealthChecks(mockFS, stallThreshold, - func(info DiskSlowInfo) { stalled[info.Path] = info.Duration }) + func(info DiskSlowInfo) { + if lastPath != info.Path { + lastPath = info.Path + stalled <- info.Path + } + }) fs.(*diskHealthCheckingFS).tickInterval = 5 * time.Millisecond - files := []string{"foo", "bar", "bax"} + var wg sync.WaitGroup for _, filename := range files { - // Create will stall, and the detector should write to the stalled map + filename := filename + // Create will stall, and the detector should write to the stalled channel // with the filename. - _, _ = fs.Create(filename) + wg.Add(1) + go func() { + defer wg.Done() + f, _ := fs.Create(filename) + if f != nil { + f.Close() + } + }() + + select { + case stalledPath := <-stalled: + require.Equal(t, filename, stalledPath) + case <-time.After(10 * time.Second): + t.Fatalf("timed out waiting for stall") + } + // Unblock the call to Create(). + stallChan <- struct{}{} + // Invoke the closer. This will cause the long-running goroutine to // exit, but the fs should still be usable and should still detect // subsequent stalls on the next iteration. require.NoError(t, closer.Close()) - require.Contains(t, stalled, filename) } + wg.Wait() }