Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-24.1: roachtest: periodically check if VMs have been preempted #135748

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,7 @@ func (f *clusterFactory) clusterMock(cfg clusterConfig) *clusterImpl {
name: f.genName(cfg),
expiration: timeutil.Now().Add(24 * time.Hour),
r: f.r,
spec: cfg.spec,
}
}

Expand Down
58 changes: 57 additions & 1 deletion pkg/cmd/roachtest/test_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/build"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestflags"
Expand All @@ -48,6 +49,12 @@ import (
"github.com/petermattis/goid"
)

func init() {
pollPreemptionInterval.Lock()
defer pollPreemptionInterval.Unlock()
pollPreemptionInterval.interval = 5 * time.Minute
}

var (
errTestsFailed = fmt.Errorf("some tests failed")

Expand Down Expand Up @@ -1219,6 +1226,9 @@ func (r *testRunner) runTest(
}
}()

// Actively poll for VM preemptions, so we can bail out of tests early and
// avoid situations where a test times out and the flake assignment logic fails.
monitorForPreemptedVMs(runCtx, t, c, l)
// This is the call to actually run the test.
s.Run(runCtx, t, c)
}()
Expand Down Expand Up @@ -1313,7 +1323,7 @@ func getVMNames(fullVMNames []string) string {
// getPreemptedVMNames returns a comma separated list of preempted VM
// names, or an empty string if no VM was preempted or an error was found.
func getPreemptedVMNames(ctx context.Context, c *clusterImpl, l *logger.Logger) string {
preemptedVMs, err := c.GetPreemptedVMs(ctx, l)
preemptedVMs, err := getPreemptedVMsHook(c, ctx, l)
if err != nil {
l.Printf("failed to check preempted VMs:\n%+v", err)
return ""
Expand Down Expand Up @@ -1915,3 +1925,49 @@ func getTestParameters(t *testImpl, c *clusterImpl, createOpts *vm.CreateOpts) m

return clusterParams
}

// getPreemptedVMsHook is a hook for unit tests to inject their own c.GetPreemptedVMs
// implementation.
var getPreemptedVMsHook = func(c cluster.Cluster, ctx context.Context, l *logger.Logger) ([]vm.PreemptedVM, error) {
return c.GetPreemptedVMs(ctx, l)
}

// pollPreemptionInterval is how often to poll for preempted VMs. We use a
// mutex protected struct to allow for unit tests to safely modify it.
// Interval defaults to 5 minutes if not set.
var pollPreemptionInterval struct {
syncutil.Mutex
interval time.Duration
}

func monitorForPreemptedVMs(ctx context.Context, t test.Test, c cluster.Cluster, l *logger.Logger) {
if c.IsLocal() || !c.Spec().UseSpotVMs {
return
}

pollPreemptionInterval.Lock()
defer pollPreemptionInterval.Unlock()
interval := pollPreemptionInterval.interval

go func() {
for {
select {
case <-ctx.Done():
return
case <-time.After(interval):
preemptedVMs, err := getPreemptedVMsHook(c, ctx, l)
if err != nil {
l.Printf("WARN: monitorForPreemptedVMs: failed to check preempted VMs:\n%+v", err)
continue
}

// If we find any preemptions, fail the test. Note that we will recheck for
// preemptions in post failure processing, which will correctly assign this
// failure as an infra flake.
if len(preemptedVMs) != 0 {
t.Errorf("monitorForPreemptedVMs: Preempted VMs detected: %s", preemptedVMs)
}
}
}
}()
}
139 changes: 100 additions & 39 deletions pkg/cmd/roachtest/test_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,25 @@ func nilLogger() *logger.Logger {
return l
}

func defaultClusterOpt() clustersOpt {
return clustersOpt{
typ: roachprodCluster,
user: "test_user",
cpuQuota: 1000,
debugMode: NoDebug,
}
}

func defaultLoggingOpt(buf *syncedBuffer) loggingOpt {
return loggingOpt{
l: nilLogger(),
tee: logger.NoTee,
stdout: buf,
stderr: buf,
artifactsDir: "",
}
}

func TestRunnerRun(t *testing.T) {
ctx := context.Background()

Expand Down Expand Up @@ -234,12 +253,7 @@ func setupRunnerTest(t *testing.T, r testRegistryImpl, testFilters []string) *ru
stderr: &stderr,
artifactsDir: "",
}
copt := clustersOpt{
typ: roachprodCluster,
user: "test_user",
cpuQuota: 1000,
debugMode: NoDebug,
}
copt := defaultClusterOpt()
return &runnerTest{
stdout: &stdout,
stderr: &stderr,
Expand Down Expand Up @@ -301,19 +315,8 @@ func TestRunnerTestTimeout(t *testing.T) {
runner := newUnitTestRunner(cr, stopper)

var buf syncedBuffer
lopt := loggingOpt{
l: nilLogger(),
tee: logger.NoTee,
stdout: &buf,
stderr: &buf,
artifactsDir: "",
}
copt := clustersOpt{
typ: roachprodCluster,
user: "test_user",
cpuQuota: 1000,
debugMode: NoDebug,
}
copt := defaultClusterOpt()
lopt := defaultLoggingOpt(&buf)
test := registry.TestSpec{
Name: `timeout`,
Owner: OwnerUnitTest,
Expand Down Expand Up @@ -418,13 +421,8 @@ func runExitCodeTest(t *testing.T, injectedError error) error {
require.NoError(t, err)

tests, _ := testsToRun(r, tf, false, 1.0, true)
lopt := loggingOpt{
l: nilLogger(),
tee: logger.NoTee,
stdout: io.Discard,
stderr: io.Discard,
artifactsDir: "",
}
var buf syncedBuffer
lopt := defaultLoggingOpt(&buf)
return runner.Run(ctx, tests, 1, 1, clustersOpt{}, testOpts{}, lopt)
}

Expand Down Expand Up @@ -537,19 +535,8 @@ func TestTransientErrorFallback(t *testing.T) {
runner := newUnitTestRunner(cr, stopper)

var buf syncedBuffer
lopt := loggingOpt{
l: nilLogger(),
tee: logger.NoTee,
stdout: &buf,
stderr: &buf,
artifactsDir: "",
}
copt := clustersOpt{
typ: roachprodCluster,
user: "test_user",
cpuQuota: 1000,
debugMode: NoDebug,
}
copt := defaultClusterOpt()
lopt := defaultLoggingOpt(&buf)

// Test that if a test fails with a transient error handled by the `require` package,
// the test runner will correctly still identify it as a flake and the run will have
Expand Down Expand Up @@ -594,3 +581,77 @@ func TestTransientErrorFallback(t *testing.T) {
}
})
}

func TestVMPreemptionPolling(t *testing.T) {
ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
cr := newClusterRegistry()
runner := newUnitTestRunner(cr, stopper)

var buf syncedBuffer
copt := defaultClusterOpt()
lopt := defaultLoggingOpt(&buf)

mockTest := registry.TestSpec{
Name: `preemption`,
Owner: OwnerUnitTest,
Cluster: spec.MakeClusterSpec(0, spec.UseSpotVMs()),
CompatibleClouds: registry.AllExceptAWS,
Suites: registry.Suites(registry.Nightly),
CockroachBinary: registry.StandardCockroach,
Timeout: 10 * time.Second,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
<-ctx.Done()
},
}

setPollPreemptionInterval := func(interval time.Duration) {
pollPreemptionInterval.Lock()
defer pollPreemptionInterval.Unlock()
pollPreemptionInterval.interval = interval
}

getPreemptedVMsHook = func(c cluster.Cluster, ctx context.Context, l *logger.Logger) ([]vm.PreemptedVM, error) {
preemptedVMs := []vm.PreemptedVM{{
Name: "test_node",
PreemptedAt: time.Now(),
}}
return preemptedVMs, nil
}

defer func() {
getPreemptedVMsHook = func(c cluster.Cluster, ctx context.Context, l *logger.Logger) ([]vm.PreemptedVM, error) {
return c.GetPreemptedVMs(ctx, l)
}
setPollPreemptionInterval(5 * time.Minute)
}()

// Test that if a VM is preempted, the VM preemption monitor will catch
// it and cancel the test before it times out.
t.Run("polling cancels test", func(t *testing.T) {
setPollPreemptionInterval(50 * time.Millisecond)

err := runner.Run(ctx, []registry.TestSpec{mockTest}, 1, /* count */
defaultParallelism, copt, testOpts{}, lopt)
// The preemption monitor should mark a VM as preempted and the test should
// be treated as a flake instead of a failed test.
require.NoError(t, err)
})

// Test that if a VM is preempted but the polling doesn't catch it because the
// test finished first, the post failure checks will check again and mark it as a flake.
t.Run("polling doesn't catch preemption", func(t *testing.T) {
// Set the interval very high so we don't poll for preemptions.
setPollPreemptionInterval(1 * time.Hour)

mockTest.Run = func(ctx context.Context, t test.Test, c cluster.Cluster) {
t.Error("Should be ignored")
}
err := runner.Run(ctx, []registry.TestSpec{mockTest}, 1, /* count */
defaultParallelism, copt, testOpts{}, lopt)
// The post test failure check should mark a VM as preempted and the test should
// be treated as a flake instead of a failed test.
require.NoError(t, err)
})
}