From 9467f7236fb1eaef236c29c814a67c7d8baaf9e0 Mon Sep 17 00:00:00 2001 From: DarrylWong Date: Tue, 8 Oct 2024 14:08:06 -0400 Subject: [PATCH 1/2] roachtest: periodically check if VMs have been preempted When a VM is preempted, we occasionally observe a test that does not error out and hangs until it times out. This is wasteful as the test may not behave as expected but we continue to run the test, only checking if a VM is preempted until it fails or times out. This change proactively monitors VMs for any preemptions, cancelling the test if any preempted VM is found. --- pkg/cmd/roachtest/cluster.go | 1 + pkg/cmd/roachtest/test_runner.go | 42 +++++++++- pkg/cmd/roachtest/test_test.go | 133 ++++++++++++++++++++++--------- 3 files changed, 136 insertions(+), 40 deletions(-) diff --git a/pkg/cmd/roachtest/cluster.go b/pkg/cmd/roachtest/cluster.go index acc480ce719c..4ef56ba60aa7 100644 --- a/pkg/cmd/roachtest/cluster.go +++ b/pkg/cmd/roachtest/cluster.go @@ -869,6 +869,7 @@ func (f *clusterFactory) clusterMock(cfg clusterConfig) *clusterImpl { name: f.genName(cfg), expiration: timeutil.Now().Add(24 * time.Hour), r: f.r, + spec: cfg.spec, } } diff --git a/pkg/cmd/roachtest/test_runner.go b/pkg/cmd/roachtest/test_runner.go index 861dada4e6d5..e67b200fea98 100644 --- a/pkg/cmd/roachtest/test_runner.go +++ b/pkg/cmd/roachtest/test_runner.go @@ -1288,6 +1288,9 @@ func (r *testRunner) runTest( }() grafanaAnnotateTestStart(runCtx, t, c) + // Actively poll for VM preemptions, so we can bail out of tests early and + // avoid situations where a test times out and the flake assignment logic fails. + monitorForPreemptedVMs(runCtx, t, c, l) // This is the call to actually run the test. s.Run(runCtx, t, c) }() @@ -1394,7 +1397,7 @@ func getVMNames(fullVMNames []string) string { // getPreemptedVMNames returns a comma separated list of preempted VM // names, or an empty string if no VM was preempted or an error was found. func getPreemptedVMNames(ctx context.Context, c *clusterImpl, l *logger.Logger) string { - preemptedVMs, err := c.GetPreemptedVMs(ctx, l) + preemptedVMs, err := getPreemptedVMsHook(c, ctx, l) if err != nil { l.Printf("failed to check preempted VMs:\n%+v", err) return "" @@ -2076,3 +2079,40 @@ func getTestParameters(t *testImpl, c *clusterImpl, createOpts *vm.CreateOpts) m return clusterParams } + +// getPreemptedVMsHook is a hook for unit tests to inject their own c.GetPreemptedVMs +// implementation. +var getPreemptedVMsHook = func(c cluster.Cluster, ctx context.Context, l *logger.Logger) ([]vm.PreemptedVM, error) { + return c.GetPreemptedVMs(ctx, l) +} + +// pollPreemptionInterval is how often to poll for preempted VMs. +var pollPreemptionInterval = 5 * time.Minute + +func monitorForPreemptedVMs(ctx context.Context, t test.Test, c cluster.Cluster, l *logger.Logger) { + if c.IsLocal() || !c.Spec().UseSpotVMs { + return + } + + go func() { + for { + select { + case <-ctx.Done(): + return + case <-time.After(pollPreemptionInterval): + preemptedVMs, err := getPreemptedVMsHook(c, ctx, l) + if err != nil { + l.Printf("WARN: monitorForPreemptedVMs: failed to check preempted VMs:\n%+v", err) + continue + } + + // If we find any preemptions, fail the test. Note that we will recheck for + // preemptions in post failure processing, which will correctly assign this + // failure as an infra flake. + if len(preemptedVMs) != 0 { + t.Errorf("monitorForPreemptedVMs: Preempted VMs detected: %s", preemptedVMs) + } + } + } + }() +} diff --git a/pkg/cmd/roachtest/test_test.go b/pkg/cmd/roachtest/test_test.go index c384550d08ff..91f7f636575a 100644 --- a/pkg/cmd/roachtest/test_test.go +++ b/pkg/cmd/roachtest/test_test.go @@ -58,6 +58,25 @@ func nilLogger() *logger.Logger { return l } +func defaultClusterOpt() clustersOpt { + return clustersOpt{ + typ: roachprodCluster, + user: "test_user", + cpuQuota: 1000, + debugMode: NoDebug, + } +} + +func defaultLoggingOpt(buf *syncedBuffer) loggingOpt { + return loggingOpt{ + l: nilLogger(), + tee: logger.NoTee, + stdout: buf, + stderr: buf, + artifactsDir: "", + } +} + func TestRunnerRun(t *testing.T) { ctx := context.Background() @@ -234,12 +253,7 @@ func setupRunnerTest(t *testing.T, r testRegistryImpl, testFilters []string) *ru stderr: &stderr, artifactsDir: "", } - copt := clustersOpt{ - typ: roachprodCluster, - user: "test_user", - cpuQuota: 1000, - debugMode: NoDebug, - } + copt := defaultClusterOpt() return &runnerTest{ stdout: &stdout, stderr: &stderr, @@ -301,19 +315,8 @@ func TestRunnerTestTimeout(t *testing.T) { runner := newUnitTestRunner(cr, stopper) var buf syncedBuffer - lopt := loggingOpt{ - l: nilLogger(), - tee: logger.NoTee, - stdout: &buf, - stderr: &buf, - artifactsDir: "", - } - copt := clustersOpt{ - typ: roachprodCluster, - user: "test_user", - cpuQuota: 1000, - debugMode: NoDebug, - } + copt := defaultClusterOpt() + lopt := defaultLoggingOpt(&buf) test := registry.TestSpec{ Name: `timeout`, Owner: OwnerUnitTest, @@ -418,13 +421,8 @@ func runExitCodeTest(t *testing.T, injectedError error) error { require.NoError(t, err) tests, _ := testsToRun(r, tf, false, 1.0, true) - lopt := loggingOpt{ - l: nilLogger(), - tee: logger.NoTee, - stdout: io.Discard, - stderr: io.Discard, - artifactsDir: "", - } + var buf syncedBuffer + lopt := defaultLoggingOpt(&buf) return runner.Run(ctx, tests, 1, 1, clustersOpt{}, testOpts{}, lopt) } @@ -537,19 +535,8 @@ func TestTransientErrorFallback(t *testing.T) { runner := newUnitTestRunner(cr, stopper) var buf syncedBuffer - lopt := loggingOpt{ - l: nilLogger(), - tee: logger.NoTee, - stdout: &buf, - stderr: &buf, - artifactsDir: "", - } - copt := clustersOpt{ - typ: roachprodCluster, - user: "test_user", - cpuQuota: 1000, - debugMode: NoDebug, - } + copt := defaultClusterOpt() + lopt := defaultLoggingOpt(&buf) // Test that if a test fails with a transient error handled by the `require` package, // the test runner will correctly still identify it as a flake and the run will have @@ -594,3 +581,71 @@ func TestTransientErrorFallback(t *testing.T) { } }) } + +func TestVMPreemptionPolling(t *testing.T) { + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + cr := newClusterRegistry() + runner := newUnitTestRunner(cr, stopper) + + var buf syncedBuffer + copt := defaultClusterOpt() + lopt := defaultLoggingOpt(&buf) + + mockTest := registry.TestSpec{ + Name: `preemption`, + Owner: OwnerUnitTest, + Cluster: spec.MakeClusterSpec(0, spec.UseSpotVMs()), + CompatibleClouds: registry.AllExceptAWS, + Suites: registry.Suites(registry.Nightly), + CockroachBinary: registry.StandardCockroach, + Timeout: 10 * time.Second, + Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { + <-ctx.Done() + }, + } + + getPreemptedVMsHook = func(c cluster.Cluster, ctx context.Context, l *logger.Logger) ([]vm.PreemptedVM, error) { + preemptedVMs := []vm.PreemptedVM{{ + Name: "test_node", + PreemptedAt: time.Now(), + }} + return preemptedVMs, nil + } + + defer func() { + getPreemptedVMsHook = func(c cluster.Cluster, ctx context.Context, l *logger.Logger) ([]vm.PreemptedVM, error) { + return c.GetPreemptedVMs(ctx, l) + } + pollPreemptionInterval = 5 * time.Minute + }() + + // Test that if a VM is preempted, the VM preemption monitor will catch + // it and cancel the test before it times out. + t.Run("polling cancels test", func(t *testing.T) { + pollPreemptionInterval = 50 * time.Millisecond + + err := runner.Run(ctx, []registry.TestSpec{mockTest}, 1, /* count */ + defaultParallelism, copt, testOpts{}, lopt) + // The preemption monitor should mark a VM as preempted and the test should + // be treated as a flake instead of a failed test. + require.NoError(t, err) + }) + + // Test that if a VM is preempted but the polling doesn't catch it because the + // test finished first, the post failure checks will check again and mark it as a flake. + t.Run("polling doesn't catch preemption", func(t *testing.T) { + // Set the interval very high so we don't poll for preemptions. + pollPreemptionInterval = 1 * time.Hour + + mockTest.Run = func(ctx context.Context, t test.Test, c cluster.Cluster) { + t.Error("Should be ignored") + } + err := runner.Run(ctx, []registry.TestSpec{mockTest}, 1, /* count */ + defaultParallelism, copt, testOpts{}, lopt) + // The post test failure check should mark a VM as preempted and the test should + // be treated as a flake instead of a failed test. + require.NoError(t, err) + }) +} From 1f4456a1c9e2d669acba402360e74928f7395793 Mon Sep 17 00:00:00 2001 From: DarrylWong Date: Fri, 15 Nov 2024 13:51:25 -0500 Subject: [PATCH 2/2] roachtest: fix TestVMPreemptionPolling data race This change switches to pollPreemptionInterval to be a mutex protected struct instead, as multiple unit tests modify it and can lead to a data race without. Fixes: #135267 Epic: none Release note: none --- pkg/cmd/roachtest/test_runner.go | 21 ++++++++++++++++++--- pkg/cmd/roachtest/test_test.go | 12 +++++++++--- 2 files changed, 27 insertions(+), 6 deletions(-) diff --git a/pkg/cmd/roachtest/test_runner.go b/pkg/cmd/roachtest/test_runner.go index e67b200fea98..4c02009ed1ec 100644 --- a/pkg/cmd/roachtest/test_runner.go +++ b/pkg/cmd/roachtest/test_runner.go @@ -50,6 +50,12 @@ import ( "github.com/petermattis/goid" ) +func init() { + pollPreemptionInterval.Lock() + defer pollPreemptionInterval.Unlock() + pollPreemptionInterval.interval = 5 * time.Minute +} + var ( errTestsFailed = fmt.Errorf("some tests failed") @@ -2086,20 +2092,29 @@ var getPreemptedVMsHook = func(c cluster.Cluster, ctx context.Context, l *logger return c.GetPreemptedVMs(ctx, l) } -// pollPreemptionInterval is how often to poll for preempted VMs. -var pollPreemptionInterval = 5 * time.Minute +// pollPreemptionInterval is how often to poll for preempted VMs. We use a +// mutex protected struct to allow for unit tests to safely modify it. +// Interval defaults to 5 minutes if not set. +var pollPreemptionInterval struct { + syncutil.Mutex + interval time.Duration +} func monitorForPreemptedVMs(ctx context.Context, t test.Test, c cluster.Cluster, l *logger.Logger) { if c.IsLocal() || !c.Spec().UseSpotVMs { return } + pollPreemptionInterval.Lock() + defer pollPreemptionInterval.Unlock() + interval := pollPreemptionInterval.interval + go func() { for { select { case <-ctx.Done(): return - case <-time.After(pollPreemptionInterval): + case <-time.After(interval): preemptedVMs, err := getPreemptedVMsHook(c, ctx, l) if err != nil { l.Printf("WARN: monitorForPreemptedVMs: failed to check preempted VMs:\n%+v", err) diff --git a/pkg/cmd/roachtest/test_test.go b/pkg/cmd/roachtest/test_test.go index 91f7f636575a..fc8914f3a45a 100644 --- a/pkg/cmd/roachtest/test_test.go +++ b/pkg/cmd/roachtest/test_test.go @@ -606,6 +606,12 @@ func TestVMPreemptionPolling(t *testing.T) { }, } + setPollPreemptionInterval := func(interval time.Duration) { + pollPreemptionInterval.Lock() + defer pollPreemptionInterval.Unlock() + pollPreemptionInterval.interval = interval + } + getPreemptedVMsHook = func(c cluster.Cluster, ctx context.Context, l *logger.Logger) ([]vm.PreemptedVM, error) { preemptedVMs := []vm.PreemptedVM{{ Name: "test_node", @@ -618,13 +624,13 @@ func TestVMPreemptionPolling(t *testing.T) { getPreemptedVMsHook = func(c cluster.Cluster, ctx context.Context, l *logger.Logger) ([]vm.PreemptedVM, error) { return c.GetPreemptedVMs(ctx, l) } - pollPreemptionInterval = 5 * time.Minute + setPollPreemptionInterval(5 * time.Minute) }() // Test that if a VM is preempted, the VM preemption monitor will catch // it and cancel the test before it times out. t.Run("polling cancels test", func(t *testing.T) { - pollPreemptionInterval = 50 * time.Millisecond + setPollPreemptionInterval(50 * time.Millisecond) err := runner.Run(ctx, []registry.TestSpec{mockTest}, 1, /* count */ defaultParallelism, copt, testOpts{}, lopt) @@ -637,7 +643,7 @@ func TestVMPreemptionPolling(t *testing.T) { // test finished first, the post failure checks will check again and mark it as a flake. t.Run("polling doesn't catch preemption", func(t *testing.T) { // Set the interval very high so we don't poll for preemptions. - pollPreemptionInterval = 1 * time.Hour + setPollPreemptionInterval(1 * time.Hour) mockTest.Run = func(ctx context.Context, t test.Test, c cluster.Cluster) { t.Error("Should be ignored")