Skip to content

Commit

Permalink
Merge #130866
Browse files Browse the repository at this point in the history
130866: roachtest: remove all panics from perturbation/* tests r=renatolabs a=andrewbaptist

Panics in the test cause the entire roachtest to stop. Instead we should call t.Fatal or similar methods. This commit passes test.Test to most methods and removes all panics from the perturbation tests.

Epic: none

Release note: None

Co-authored-by: Andrew Baptist <[email protected]>
  • Loading branch information
craig[bot] and andrewbaptist committed Sep 17, 2024
2 parents 7de9aca + b7b61d0 commit 29ad4d3
Showing 1 changed file with 65 additions and 87 deletions.
152 changes: 65 additions & 87 deletions pkg/cmd/roachtest/tests/admission_control_latency.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,16 +281,16 @@ type perturbation interface {
// startTargetNode is called for custom logic starting the target node(s).
// Some of the perturbations need special logic for starting the target
// node.
startTargetNode(ctx context.Context, l *logger.Logger, v variations)
startTargetNode(ctx context.Context, t test.Test, v variations)

// startPerturbation begins the system change and blocks until it is
// finished. It returns the duration looking backwards to collect
// performance stats.
startPerturbation(ctx context.Context, l *logger.Logger, v variations) time.Duration
startPerturbation(ctx context.Context, t test.Test, v variations) time.Duration

// endPerturbation ends the system change. Not all perturbations do anything on stop.
// It returns the duration looking backwards to collect performance stats.
endPerturbation(ctx context.Context, l *logger.Logger, v variations) time.Duration
endPerturbation(ctx context.Context, t test.Test, v variations) time.Duration
}

// backfill will create a backfill table during the startup and an index on it
Expand All @@ -302,24 +302,24 @@ type backfill struct{}
var _ perturbation = backfill{}

// startTargetNode starts the target node and creates the backfill table.
func (b backfill) startTargetNode(ctx context.Context, l *logger.Logger, v variations) {
v.startNoBackup(ctx, l, v.targetNodes())
func (b backfill) startTargetNode(ctx context.Context, t test.Test, v variations) {
v.startNoBackup(ctx, t, v.targetNodes())

// Create enough splits to start with one replica on each store.
numSplits := v.vcpu * v.disks
// TODO(baptist): Handle multiple target nodes.
target := v.targetNodes()[0]
initCmd := fmt.Sprintf("./cockroach workload init kv --db backfill --splits %d {pgurl:1}", numSplits)
v.Run(ctx, option.WithNodes(v.Node(1)), initCmd)
db := v.Conn(ctx, l, 1)
db := v.Conn(ctx, t.L(), 1)
defer db.Close()

cmd := fmt.Sprintf("ALTER DATABASE backfill CONFIGURE ZONE USING constraints = '[+node%d]', lease_preferences='[[-node%d]]'", target, target)
if _, err := db.ExecContext(ctx, cmd); err != nil {
panic(err)
}
l.Printf("waiting for replicas to be in place")
v.waitForRebalanceToStop(ctx, l)
_, err := db.ExecContext(ctx, cmd)
require.NoError(t, err)

t.L().Printf("waiting for replicas to be in place")
v.waitForRebalanceToStop(ctx, t)

// Create and fill the backfill kv database before the test starts. We don't
// want the fill to impact the test throughput. We use a larger block size
Expand All @@ -329,29 +329,24 @@ func (b backfill) startTargetNode(ctx context.Context, l *logger.Logger, v varia
v.perturbationDuration, 10_000, 10_000, v.stableNodes())
v.Run(ctx, option.WithNodes(v.workloadNodes()), runCmd)

l.Printf("waiting for io overload to end")
v.waitForIOOverloadToEnd(ctx, l)
v.waitForRebalanceToStop(ctx, l)
t.L().Printf("waiting for io overload to end")
v.waitForIOOverloadToEnd(ctx, t)
v.waitForRebalanceToStop(ctx, t)
}

// startPerturbation creates the index for the table.
func (b backfill) startPerturbation(
ctx context.Context, l *logger.Logger, v variations,
) time.Duration {
db := v.Conn(ctx, l, 1)
func (b backfill) startPerturbation(ctx context.Context, t test.Test, v variations) time.Duration {
db := v.Conn(ctx, t.L(), 1)
defer db.Close()
startTime := timeutil.Now()
cmd := "CREATE INDEX backfill_index ON backfill.kv (k, v)"
if _, err := db.ExecContext(ctx, cmd); err != nil {
panic(err)
}
_, err := db.ExecContext(ctx, cmd)
require.NoError(t, err)
return timeutil.Since(startTime)
}

// endPerturbation does nothing as the backfill database is already created.
func (b backfill) endPerturbation(
ctx context.Context, l *logger.Logger, v variations,
) time.Duration {
func (b backfill) endPerturbation(ctx context.Context, t test.Test, v variations) time.Duration {
waitDuration(ctx, v.validationDuration)
return v.validationDuration
}
Expand All @@ -361,14 +356,12 @@ type restart struct{}

var _ perturbation = restart{}

func (r restart) startTargetNode(ctx context.Context, l *logger.Logger, v variations) {
v.startNoBackup(ctx, l, v.targetNodes())
func (r restart) startTargetNode(ctx context.Context, t test.Test, v variations) {
v.startNoBackup(ctx, t, v.targetNodes())
}

// startPerturbation stops the target node with a graceful shutdown.
func (r restart) startPerturbation(
ctx context.Context, l *logger.Logger, v variations,
) time.Duration {
func (r restart) startPerturbation(ctx context.Context, t test.Test, v variations) time.Duration {
startTime := timeutil.Now()
gracefulOpts := option.DefaultStopOpts()
// SIGTERM for clean shutdown
Expand All @@ -378,7 +371,7 @@ func (r restart) startPerturbation(
gracefulOpts.RoachprodOpts.Sig = 9
}
gracefulOpts.RoachprodOpts.Wait = true
v.Stop(ctx, l, gracefulOpts, v.targetNodes())
v.Stop(ctx, t.L(), gracefulOpts, v.targetNodes())
waitDuration(ctx, v.perturbationDuration)
if v.cleanRestart {
return timeutil.Since(startTime)
Expand All @@ -388,11 +381,9 @@ func (r restart) startPerturbation(
}

// endPerturbation restarts the node.
func (r restart) endPerturbation(
ctx context.Context, l *logger.Logger, v variations,
) time.Duration {
func (r restart) endPerturbation(ctx context.Context, t test.Test, v variations) time.Duration {
startTime := timeutil.Now()
v.startNoBackup(ctx, l, v.targetNodes())
v.startNoBackup(ctx, t, v.targetNodes())
waitDuration(ctx, v.validationDuration)
return timeutil.Since(startTime)
}
Expand All @@ -412,17 +403,13 @@ type partition struct{}

var _ perturbation = partition{}

func (p partition) startTargetNode(ctx context.Context, l *logger.Logger, v variations) {
v.startNoBackup(ctx, l, v.targetNodes())
func (p partition) startTargetNode(ctx context.Context, t test.Test, v variations) {
v.startNoBackup(ctx, t, v.targetNodes())
}

func (p partition) startPerturbation(
ctx context.Context, l *logger.Logger, v variations,
) time.Duration {
targetIPs, err := v.InternalIP(ctx, l, v.targetNodes())
if err != nil {
panic(err)
}
func (p partition) startPerturbation(ctx context.Context, t test.Test, v variations) time.Duration {
targetIPs, err := v.InternalIP(ctx, t.L(), v.targetNodes())
require.NoError(t, err)

if !v.IsLocal() {
v.Run(
Expand All @@ -437,9 +424,7 @@ func (p partition) startPerturbation(
return v.perturbationDuration - 20*time.Second
}

func (p partition) endPerturbation(
ctx context.Context, l *logger.Logger, v variations,
) time.Duration {
func (p partition) endPerturbation(ctx context.Context, t test.Test, v variations) time.Duration {
startTime := timeutil.Now()
if !v.IsLocal() {
v.Run(ctx, v.withPartitionedNodes(v), `sudo iptables -F`)
Expand All @@ -454,25 +439,23 @@ type addNode struct{}

var _ perturbation = addNode{}

func (addNode) startTargetNode(ctx context.Context, l *logger.Logger, v variations) {
func (addNode) startTargetNode(ctx context.Context, t test.Test, v variations) {
}

func (a addNode) startPerturbation(
ctx context.Context, l *logger.Logger, v variations,
) time.Duration {
func (a addNode) startPerturbation(ctx context.Context, t test.Test, v variations) time.Duration {
startTime := timeutil.Now()
v.startNoBackup(ctx, l, v.targetNodes())
v.startNoBackup(ctx, t, v.targetNodes())
// Wait out the time until the store is no longer suspect. The 11s is based
// on the 10s server.time_after_store_suspect setting which we set below
// plus 1 sec for the store to propagate its gossip information.
waitDuration(ctx, 11*time.Second)
v.waitForRebalanceToStop(ctx, l)
v.waitForRebalanceToStop(ctx, t)
return timeutil.Since(startTime)
}

// endPerturbation already waited for completion as part of start, so it doesn't
// need to wait again here.
func (addNode) endPerturbation(ctx context.Context, l *logger.Logger, v variations) time.Duration {
func (addNode) endPerturbation(ctx context.Context, t test.Test, v variations) time.Duration {
waitDuration(ctx, v.validationDuration)
return v.validationDuration
}
Expand All @@ -482,17 +465,17 @@ type decommission struct{}

var _ perturbation = restart{}

func (d decommission) startTargetNode(ctx context.Context, l *logger.Logger, v variations) {
v.startNoBackup(ctx, l, v.targetNodes())
func (d decommission) startTargetNode(ctx context.Context, t test.Test, v variations) {
v.startNoBackup(ctx, t, v.targetNodes())
}

func (d decommission) startPerturbation(
ctx context.Context, l *logger.Logger, v variations,
ctx context.Context, t test.Test, v variations,
) time.Duration {
startTime := timeutil.Now()
// TODO(baptist): If we want to support multiple decommissions in parallel,
// run drain and decommission in separate goroutine.
l.Printf("draining target nodes")
t.L().Printf("draining target nodes")
for _, node := range v.targetNodes() {
drainCmd := fmt.Sprintf(
"./cockroach node drain --self --certs-dir=%s --port={pgport:%d}",
Expand All @@ -505,7 +488,7 @@ func (d decommission) startPerturbation(
// Wait for all the other nodes to see the drain over gossip.
time.Sleep(10 * time.Second)

l.Printf("decommissioning nodes")
t.L().Printf("decommissioning nodes")
for _, node := range v.targetNodes() {
decommissionCmd := fmt.Sprintf(
"./cockroach node decommission --self --certs-dir=%s --port={pgport:%d}",
Expand All @@ -515,15 +498,15 @@ func (d decommission) startPerturbation(
v.Run(ctx, option.WithNodes(v.Node(node)), decommissionCmd)
}

l.Printf("stopping decommissioned nodes")
v.Stop(ctx, l, option.DefaultStopOpts(), v.targetNodes())
t.L().Printf("stopping decommissioned nodes")
v.Stop(ctx, t.L(), option.DefaultStopOpts(), v.targetNodes())
return timeutil.Since(startTime)
}

// endPerturbation already waited for completion as part of start, so it doesn't
// need to wait again here.
func (d decommission) endPerturbation(
ctx context.Context, l *logger.Logger, v variations,
ctx context.Context, t test.Test, v variations,
) time.Duration {
waitDuration(ctx, v.validationDuration)
return v.validationDuration
Expand Down Expand Up @@ -642,8 +625,8 @@ func (v variations) runTest(ctx context.Context, t test.Test, c cluster.Cluster)
m := c.NewMonitor(ctx, v.stableNodes())

// Start the stable nodes and let the perturbation start the target node(s).
v.startNoBackup(ctx, t.L(), v.stableNodes())
v.perturbation.startTargetNode(ctx, t.L(), v)
v.startNoBackup(ctx, t, v.stableNodes())
v.perturbation.startTargetNode(ctx, t, v)

func() {
// TODO(baptist): Remove this block once #120073 is fixed.
Expand All @@ -670,7 +653,7 @@ func (v variations) runTest(ctx context.Context, t test.Test, c cluster.Cluster)

// Wait for rebalancing to finish before starting to fill. This minimizes
// the time to finish.
v.waitForRebalanceToStop(ctx, t.L())
v.waitForRebalanceToStop(ctx, t)
require.NoError(t, v.workload.initWorkload(ctx, v))

// Capture the stable rate near the last 1/4 of the fill process.
Expand All @@ -679,7 +662,7 @@ func (v variations) runTest(ctx context.Context, t test.Test, c cluster.Cluster)
// Wait for the first 3/4 of the duration and then measure the QPS in
// the last 1/4.
waitDuration(ctx, v.fillDuration*3/4)
clusterMaxRate <- v.measureQPS(ctx, t, t.L(), v.fillDuration*1/4)
clusterMaxRate <- v.measureQPS(ctx, t, v.fillDuration*1/4)
return nil
})
// Start filling the system without a rate.
Expand Down Expand Up @@ -708,11 +691,11 @@ func (v variations) runTest(ctx context.Context, t test.Test, c cluster.Cluster)
baselineInterval := intervalSince(v.validationDuration / 2)
// Now start the perturbation.
t.Status("T3: inducing perturbation")
perturbationDuration := v.perturbation.startPerturbation(ctx, t.L(), v)
perturbationDuration := v.perturbation.startPerturbation(ctx, t, v)
perturbationInterval := intervalSince(perturbationDuration)

t.Status("T4: recovery from the perturbation")
afterDuration := v.perturbation.endPerturbation(ctx, t.L(), v)
afterDuration := v.perturbation.endPerturbation(ctx, t, v)
afterInterval := intervalSince(afterDuration)

t.L().Printf("Baseline interval : %s", baselineInterval)
Expand Down Expand Up @@ -806,25 +789,23 @@ func isAcceptableChange(
}

// startNoBackup starts the nodes without enabling backup.
func (v variations) startNoBackup(
ctx context.Context, l *logger.Logger, nodes option.NodeListOption,
) {
func (v variations) startNoBackup(ctx context.Context, t test.Test, nodes option.NodeListOption) {
nodesPerRegion := v.numNodes / NUM_REGIONS
for _, node := range nodes {
// Don't start a backup schedule because this test is timing sensitive.
opts := option.NewStartOpts(option.NoBackupSchedule)
opts.RoachprodOpts.StoreCount = v.disks
opts.RoachprodOpts.ExtraArgs = append(opts.RoachprodOpts.ExtraArgs,
fmt.Sprintf("--locality=region=fake-%d", (node-1)/nodesPerRegion))
v.Start(ctx, l, opts, install.MakeClusterSettings(), v.Node(node))
v.Start(ctx, t.L(), opts, install.MakeClusterSettings(), v.Node(node))
}
}

// waitForRebalanceToStop polls the system.rangelog every second to see if there
// have been any transfers in the last 5 seconds. It returns once the system
// stops transferring replicas.
func (v variations) waitForRebalanceToStop(ctx context.Context, l *logger.Logger) {
db := v.Conn(ctx, l, 1)
func (v variations) waitForRebalanceToStop(ctx context.Context, t test.Test) {
db := v.Conn(ctx, t.L(), 1)
defer db.Close()
q := `SELECT extract_duration(seconds FROM now()-timestamp) FROM system.rangelog WHERE "eventType" = 'add_voter' ORDER BY timestamp DESC LIMIT 1`

Expand All @@ -836,22 +817,23 @@ func (v variations) waitForRebalanceToStop(ctx context.Context, l *logger.Logger
if row := db.QueryRow(q); row != nil {
var secondsSinceLastEvent int
if err := row.Scan(&secondsSinceLastEvent); err != nil && !errors.Is(err, gosql.ErrNoRows) {
panic(err)
t.Fatal(err)
}
if secondsSinceLastEvent > 5 {
return
}
}
}
panic("retry should not have exited")
// This loop should never end until success or fatal.
t.FailNow()
}

// waitForIOOverloadToEnd polls the system.metrics every second to see if there
// is any IO overload on the target nodes. It returns once the overload ends.
func (v variations) waitForIOOverloadToEnd(ctx context.Context, l *logger.Logger) {
func (v variations) waitForIOOverloadToEnd(ctx context.Context, t test.Test) {
var dbs []*gosql.DB
for _, nodeId := range v.targetNodes() {
db := v.Conn(ctx, l, nodeId)
db := v.Conn(ctx, t.L(), nodeId)
defer db.Close()
dbs = append(dbs, db)
}
Expand All @@ -867,7 +849,7 @@ func (v variations) waitForIOOverloadToEnd(ctx context.Context, l *logger.Logger
if row := db.QueryRow(q); row != nil {
var overload float64
if err := row.Scan(&overload); err != nil && !errors.Is(err, gosql.ErrNoRows) {
panic(err)
t.Fatal(err)
}
if overload > 0.01 {
anyOverloaded = true
Expand All @@ -878,7 +860,8 @@ func (v variations) waitForIOOverloadToEnd(ctx context.Context, l *logger.Logger
return
}
}
panic("retry should not have exited")
// This loop should never end until success or fatal.
t.FailNow()
}

func (v variations) workloadNodes() option.NodeListOption {
Expand All @@ -899,16 +882,14 @@ func (v variations) targetNodes() option.NodeListOption {
// duration is the interval to measure over. Setting too short of an interval
// can mean inaccuracy in results. Setting too long of an interval may mean the
// impact is blurred out.
func (v variations) measureQPS(
ctx context.Context, t test.Test, l *logger.Logger, duration time.Duration,
) int {
func (v variations) measureQPS(ctx context.Context, t test.Test, duration time.Duration) int {
stableNodes := v.stableNodes()

totalOpsCompleted := func() int {
// NB: We can't hold the connection open during the full duration.
var dbs []*gosql.DB
for _, nodeId := range stableNodes {
db := v.Conn(ctx, l, nodeId)
db := v.Conn(ctx, t.L(), nodeId)
defer db.Close()
dbs = append(dbs, db)
}
Expand All @@ -928,10 +909,7 @@ func (v variations) measureQPS(
})
}

if err := group.Wait(); err != nil {
t.Fatal(err)
}

require.NoError(t, group.Wait())
return int(total)
}

Expand Down

0 comments on commit 29ad4d3

Please sign in to comment.