From b7b61d093a6ca1584dabf1234d6ffe516ebb482e Mon Sep 17 00:00:00 2001 From: Andrew Baptist Date: Tue, 17 Sep 2024 10:12:58 -0400 Subject: [PATCH] roachtest: remove all panics from perturbation/* tests Panics in the test cause the entire roachtest to stop. Instead we should call t.Fatal or similar methods. This commit passes test.Test to most methods and removes all panics from the perturbation tests. Epic: none Release note: None --- .../tests/admission_control_latency.go | 152 ++++++++---------- 1 file changed, 65 insertions(+), 87 deletions(-) diff --git a/pkg/cmd/roachtest/tests/admission_control_latency.go b/pkg/cmd/roachtest/tests/admission_control_latency.go index fe72d25d07e6..321969b1de52 100644 --- a/pkg/cmd/roachtest/tests/admission_control_latency.go +++ b/pkg/cmd/roachtest/tests/admission_control_latency.go @@ -281,16 +281,16 @@ type perturbation interface { // startTargetNode is called for custom logic starting the target node(s). // Some of the perturbations need special logic for starting the target // node. - startTargetNode(ctx context.Context, l *logger.Logger, v variations) + startTargetNode(ctx context.Context, t test.Test, v variations) // startPerturbation begins the system change and blocks until it is // finished. It returns the duration looking backwards to collect // performance stats. - startPerturbation(ctx context.Context, l *logger.Logger, v variations) time.Duration + startPerturbation(ctx context.Context, t test.Test, v variations) time.Duration // endPerturbation ends the system change. Not all perturbations do anything on stop. // It returns the duration looking backwards to collect performance stats. - endPerturbation(ctx context.Context, l *logger.Logger, v variations) time.Duration + endPerturbation(ctx context.Context, t test.Test, v variations) time.Duration } // backfill will create a backfill table during the startup and an index on it @@ -302,8 +302,8 @@ type backfill struct{} var _ perturbation = backfill{} // startTargetNode starts the target node and creates the backfill table. -func (b backfill) startTargetNode(ctx context.Context, l *logger.Logger, v variations) { - v.startNoBackup(ctx, l, v.targetNodes()) +func (b backfill) startTargetNode(ctx context.Context, t test.Test, v variations) { + v.startNoBackup(ctx, t, v.targetNodes()) // Create enough splits to start with one replica on each store. numSplits := v.vcpu * v.disks @@ -311,15 +311,15 @@ func (b backfill) startTargetNode(ctx context.Context, l *logger.Logger, v varia target := v.targetNodes()[0] initCmd := fmt.Sprintf("./cockroach workload init kv --db backfill --splits %d {pgurl:1}", numSplits) v.Run(ctx, option.WithNodes(v.Node(1)), initCmd) - db := v.Conn(ctx, l, 1) + db := v.Conn(ctx, t.L(), 1) defer db.Close() cmd := fmt.Sprintf("ALTER DATABASE backfill CONFIGURE ZONE USING constraints = '[+node%d]', lease_preferences='[[-node%d]]'", target, target) - if _, err := db.ExecContext(ctx, cmd); err != nil { - panic(err) - } - l.Printf("waiting for replicas to be in place") - v.waitForRebalanceToStop(ctx, l) + _, err := db.ExecContext(ctx, cmd) + require.NoError(t, err) + + t.L().Printf("waiting for replicas to be in place") + v.waitForRebalanceToStop(ctx, t) // Create and fill the backfill kv database before the test starts. We don't // want the fill to impact the test throughput. We use a larger block size @@ -329,29 +329,24 @@ func (b backfill) startTargetNode(ctx context.Context, l *logger.Logger, v varia v.perturbationDuration, 10_000, 10_000, v.stableNodes()) v.Run(ctx, option.WithNodes(v.workloadNodes()), runCmd) - l.Printf("waiting for io overload to end") - v.waitForIOOverloadToEnd(ctx, l) - v.waitForRebalanceToStop(ctx, l) + t.L().Printf("waiting for io overload to end") + v.waitForIOOverloadToEnd(ctx, t) + v.waitForRebalanceToStop(ctx, t) } // startPerturbation creates the index for the table. -func (b backfill) startPerturbation( - ctx context.Context, l *logger.Logger, v variations, -) time.Duration { - db := v.Conn(ctx, l, 1) +func (b backfill) startPerturbation(ctx context.Context, t test.Test, v variations) time.Duration { + db := v.Conn(ctx, t.L(), 1) defer db.Close() startTime := timeutil.Now() cmd := "CREATE INDEX backfill_index ON backfill.kv (k, v)" - if _, err := db.ExecContext(ctx, cmd); err != nil { - panic(err) - } + _, err := db.ExecContext(ctx, cmd) + require.NoError(t, err) return timeutil.Since(startTime) } // endPerturbation does nothing as the backfill database is already created. -func (b backfill) endPerturbation( - ctx context.Context, l *logger.Logger, v variations, -) time.Duration { +func (b backfill) endPerturbation(ctx context.Context, t test.Test, v variations) time.Duration { waitDuration(ctx, v.validationDuration) return v.validationDuration } @@ -361,14 +356,12 @@ type restart struct{} var _ perturbation = restart{} -func (r restart) startTargetNode(ctx context.Context, l *logger.Logger, v variations) { - v.startNoBackup(ctx, l, v.targetNodes()) +func (r restart) startTargetNode(ctx context.Context, t test.Test, v variations) { + v.startNoBackup(ctx, t, v.targetNodes()) } // startPerturbation stops the target node with a graceful shutdown. -func (r restart) startPerturbation( - ctx context.Context, l *logger.Logger, v variations, -) time.Duration { +func (r restart) startPerturbation(ctx context.Context, t test.Test, v variations) time.Duration { startTime := timeutil.Now() gracefulOpts := option.DefaultStopOpts() // SIGTERM for clean shutdown @@ -378,7 +371,7 @@ func (r restart) startPerturbation( gracefulOpts.RoachprodOpts.Sig = 9 } gracefulOpts.RoachprodOpts.Wait = true - v.Stop(ctx, l, gracefulOpts, v.targetNodes()) + v.Stop(ctx, t.L(), gracefulOpts, v.targetNodes()) waitDuration(ctx, v.perturbationDuration) if v.cleanRestart { return timeutil.Since(startTime) @@ -388,11 +381,9 @@ func (r restart) startPerturbation( } // endPerturbation restarts the node. -func (r restart) endPerturbation( - ctx context.Context, l *logger.Logger, v variations, -) time.Duration { +func (r restart) endPerturbation(ctx context.Context, t test.Test, v variations) time.Duration { startTime := timeutil.Now() - v.startNoBackup(ctx, l, v.targetNodes()) + v.startNoBackup(ctx, t, v.targetNodes()) waitDuration(ctx, v.validationDuration) return timeutil.Since(startTime) } @@ -412,17 +403,13 @@ type partition struct{} var _ perturbation = partition{} -func (p partition) startTargetNode(ctx context.Context, l *logger.Logger, v variations) { - v.startNoBackup(ctx, l, v.targetNodes()) +func (p partition) startTargetNode(ctx context.Context, t test.Test, v variations) { + v.startNoBackup(ctx, t, v.targetNodes()) } -func (p partition) startPerturbation( - ctx context.Context, l *logger.Logger, v variations, -) time.Duration { - targetIPs, err := v.InternalIP(ctx, l, v.targetNodes()) - if err != nil { - panic(err) - } +func (p partition) startPerturbation(ctx context.Context, t test.Test, v variations) time.Duration { + targetIPs, err := v.InternalIP(ctx, t.L(), v.targetNodes()) + require.NoError(t, err) if !v.IsLocal() { v.Run( @@ -437,9 +424,7 @@ func (p partition) startPerturbation( return v.perturbationDuration - 20*time.Second } -func (p partition) endPerturbation( - ctx context.Context, l *logger.Logger, v variations, -) time.Duration { +func (p partition) endPerturbation(ctx context.Context, t test.Test, v variations) time.Duration { startTime := timeutil.Now() if !v.IsLocal() { v.Run(ctx, v.withPartitionedNodes(v), `sudo iptables -F`) @@ -454,25 +439,23 @@ type addNode struct{} var _ perturbation = addNode{} -func (addNode) startTargetNode(ctx context.Context, l *logger.Logger, v variations) { +func (addNode) startTargetNode(ctx context.Context, t test.Test, v variations) { } -func (a addNode) startPerturbation( - ctx context.Context, l *logger.Logger, v variations, -) time.Duration { +func (a addNode) startPerturbation(ctx context.Context, t test.Test, v variations) time.Duration { startTime := timeutil.Now() - v.startNoBackup(ctx, l, v.targetNodes()) + v.startNoBackup(ctx, t, v.targetNodes()) // Wait out the time until the store is no longer suspect. The 11s is based // on the 10s server.time_after_store_suspect setting which we set below // plus 1 sec for the store to propagate its gossip information. waitDuration(ctx, 11*time.Second) - v.waitForRebalanceToStop(ctx, l) + v.waitForRebalanceToStop(ctx, t) return timeutil.Since(startTime) } // endPerturbation already waited for completion as part of start, so it doesn't // need to wait again here. -func (addNode) endPerturbation(ctx context.Context, l *logger.Logger, v variations) time.Duration { +func (addNode) endPerturbation(ctx context.Context, t test.Test, v variations) time.Duration { waitDuration(ctx, v.validationDuration) return v.validationDuration } @@ -482,17 +465,17 @@ type decommission struct{} var _ perturbation = restart{} -func (d decommission) startTargetNode(ctx context.Context, l *logger.Logger, v variations) { - v.startNoBackup(ctx, l, v.targetNodes()) +func (d decommission) startTargetNode(ctx context.Context, t test.Test, v variations) { + v.startNoBackup(ctx, t, v.targetNodes()) } func (d decommission) startPerturbation( - ctx context.Context, l *logger.Logger, v variations, + ctx context.Context, t test.Test, v variations, ) time.Duration { startTime := timeutil.Now() // TODO(baptist): If we want to support multiple decommissions in parallel, // run drain and decommission in separate goroutine. - l.Printf("draining target nodes") + t.L().Printf("draining target nodes") for _, node := range v.targetNodes() { drainCmd := fmt.Sprintf( "./cockroach node drain --self --certs-dir=%s --port={pgport:%d}", @@ -505,7 +488,7 @@ func (d decommission) startPerturbation( // Wait for all the other nodes to see the drain over gossip. time.Sleep(10 * time.Second) - l.Printf("decommissioning nodes") + t.L().Printf("decommissioning nodes") for _, node := range v.targetNodes() { decommissionCmd := fmt.Sprintf( "./cockroach node decommission --self --certs-dir=%s --port={pgport:%d}", @@ -515,15 +498,15 @@ func (d decommission) startPerturbation( v.Run(ctx, option.WithNodes(v.Node(node)), decommissionCmd) } - l.Printf("stopping decommissioned nodes") - v.Stop(ctx, l, option.DefaultStopOpts(), v.targetNodes()) + t.L().Printf("stopping decommissioned nodes") + v.Stop(ctx, t.L(), option.DefaultStopOpts(), v.targetNodes()) return timeutil.Since(startTime) } // endPerturbation already waited for completion as part of start, so it doesn't // need to wait again here. func (d decommission) endPerturbation( - ctx context.Context, l *logger.Logger, v variations, + ctx context.Context, t test.Test, v variations, ) time.Duration { waitDuration(ctx, v.validationDuration) return v.validationDuration @@ -642,8 +625,8 @@ func (v variations) runTest(ctx context.Context, t test.Test, c cluster.Cluster) m := c.NewMonitor(ctx, v.stableNodes()) // Start the stable nodes and let the perturbation start the target node(s). - v.startNoBackup(ctx, t.L(), v.stableNodes()) - v.perturbation.startTargetNode(ctx, t.L(), v) + v.startNoBackup(ctx, t, v.stableNodes()) + v.perturbation.startTargetNode(ctx, t, v) func() { // TODO(baptist): Remove this block once #120073 is fixed. @@ -670,7 +653,7 @@ func (v variations) runTest(ctx context.Context, t test.Test, c cluster.Cluster) // Wait for rebalancing to finish before starting to fill. This minimizes // the time to finish. - v.waitForRebalanceToStop(ctx, t.L()) + v.waitForRebalanceToStop(ctx, t) require.NoError(t, v.workload.initWorkload(ctx, v)) // Capture the stable rate near the last 1/4 of the fill process. @@ -679,7 +662,7 @@ func (v variations) runTest(ctx context.Context, t test.Test, c cluster.Cluster) // Wait for the first 3/4 of the duration and then measure the QPS in // the last 1/4. waitDuration(ctx, v.fillDuration*3/4) - clusterMaxRate <- v.measureQPS(ctx, t, t.L(), v.fillDuration*1/4) + clusterMaxRate <- v.measureQPS(ctx, t, v.fillDuration*1/4) return nil }) // Start filling the system without a rate. @@ -708,11 +691,11 @@ func (v variations) runTest(ctx context.Context, t test.Test, c cluster.Cluster) baselineInterval := intervalSince(v.validationDuration / 2) // Now start the perturbation. t.Status("T3: inducing perturbation") - perturbationDuration := v.perturbation.startPerturbation(ctx, t.L(), v) + perturbationDuration := v.perturbation.startPerturbation(ctx, t, v) perturbationInterval := intervalSince(perturbationDuration) t.Status("T4: recovery from the perturbation") - afterDuration := v.perturbation.endPerturbation(ctx, t.L(), v) + afterDuration := v.perturbation.endPerturbation(ctx, t, v) afterInterval := intervalSince(afterDuration) t.L().Printf("Baseline interval : %s", baselineInterval) @@ -806,9 +789,7 @@ func isAcceptableChange( } // startNoBackup starts the nodes without enabling backup. -func (v variations) startNoBackup( - ctx context.Context, l *logger.Logger, nodes option.NodeListOption, -) { +func (v variations) startNoBackup(ctx context.Context, t test.Test, nodes option.NodeListOption) { nodesPerRegion := v.numNodes / NUM_REGIONS for _, node := range nodes { // Don't start a backup schedule because this test is timing sensitive. @@ -816,15 +797,15 @@ func (v variations) startNoBackup( opts.RoachprodOpts.StoreCount = v.disks opts.RoachprodOpts.ExtraArgs = append(opts.RoachprodOpts.ExtraArgs, fmt.Sprintf("--locality=region=fake-%d", (node-1)/nodesPerRegion)) - v.Start(ctx, l, opts, install.MakeClusterSettings(), v.Node(node)) + v.Start(ctx, t.L(), opts, install.MakeClusterSettings(), v.Node(node)) } } // waitForRebalanceToStop polls the system.rangelog every second to see if there // have been any transfers in the last 5 seconds. It returns once the system // stops transferring replicas. -func (v variations) waitForRebalanceToStop(ctx context.Context, l *logger.Logger) { - db := v.Conn(ctx, l, 1) +func (v variations) waitForRebalanceToStop(ctx context.Context, t test.Test) { + db := v.Conn(ctx, t.L(), 1) defer db.Close() q := `SELECT extract_duration(seconds FROM now()-timestamp) FROM system.rangelog WHERE "eventType" = 'add_voter' ORDER BY timestamp DESC LIMIT 1` @@ -836,22 +817,23 @@ func (v variations) waitForRebalanceToStop(ctx context.Context, l *logger.Logger if row := db.QueryRow(q); row != nil { var secondsSinceLastEvent int if err := row.Scan(&secondsSinceLastEvent); err != nil && !errors.Is(err, gosql.ErrNoRows) { - panic(err) + t.Fatal(err) } if secondsSinceLastEvent > 5 { return } } } - panic("retry should not have exited") + // This loop should never end until success or fatal. + t.FailNow() } // waitForIOOverloadToEnd polls the system.metrics every second to see if there // is any IO overload on the target nodes. It returns once the overload ends. -func (v variations) waitForIOOverloadToEnd(ctx context.Context, l *logger.Logger) { +func (v variations) waitForIOOverloadToEnd(ctx context.Context, t test.Test) { var dbs []*gosql.DB for _, nodeId := range v.targetNodes() { - db := v.Conn(ctx, l, nodeId) + db := v.Conn(ctx, t.L(), nodeId) defer db.Close() dbs = append(dbs, db) } @@ -867,7 +849,7 @@ func (v variations) waitForIOOverloadToEnd(ctx context.Context, l *logger.Logger if row := db.QueryRow(q); row != nil { var overload float64 if err := row.Scan(&overload); err != nil && !errors.Is(err, gosql.ErrNoRows) { - panic(err) + t.Fatal(err) } if overload > 0.01 { anyOverloaded = true @@ -878,7 +860,8 @@ func (v variations) waitForIOOverloadToEnd(ctx context.Context, l *logger.Logger return } } - panic("retry should not have exited") + // This loop should never end until success or fatal. + t.FailNow() } func (v variations) workloadNodes() option.NodeListOption { @@ -899,16 +882,14 @@ func (v variations) targetNodes() option.NodeListOption { // duration is the interval to measure over. Setting too short of an interval // can mean inaccuracy in results. Setting too long of an interval may mean the // impact is blurred out. -func (v variations) measureQPS( - ctx context.Context, t test.Test, l *logger.Logger, duration time.Duration, -) int { +func (v variations) measureQPS(ctx context.Context, t test.Test, duration time.Duration) int { stableNodes := v.stableNodes() totalOpsCompleted := func() int { // NB: We can't hold the connection open during the full duration. var dbs []*gosql.DB for _, nodeId := range stableNodes { - db := v.Conn(ctx, l, nodeId) + db := v.Conn(ctx, t.L(), nodeId) defer db.Close() dbs = append(dbs, db) } @@ -928,10 +909,7 @@ func (v variations) measureQPS( }) } - if err := group.Wait(); err != nil { - t.Fatal(err) - } - + require.NoError(t, group.Wait()) return int(total) }