Skip to content

Commit

Permalink
Merge #101447
Browse files Browse the repository at this point in the history
101447: c2c: randomize node shutdown timing in roachtests r=stevendanna a=msbutler

Previously, the c2c/nodeShutdown tests would only execute a shutdown after a high water mark was set and before a cutover timestamp was chosen. This patch randomizes the node shutdown to either occur during the initial scan, during this steady state phase, or during cutover.

This patch also fixes an infra bug that caused /src shutdown tests to run on /dest and /coordinator tests to run on /worker introduced in #101220.

Informs: #89487

Release note: none

Co-authored-by: Michael Butler <[email protected]>
  • Loading branch information
craig[bot] and msbutler committed Apr 14, 2023
2 parents 14615c5 + dcaf83b commit f13081c
Showing 1 changed file with 27 additions and 10 deletions.
37 changes: 27 additions & 10 deletions pkg/cmd/roachtest/tests/cluster_to_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
gosql "database/sql"
"fmt"
"math/rand"
"net/url"
"os"
"path/filepath"
Expand Down Expand Up @@ -228,10 +229,19 @@ type replicateKV struct {

// This field is merely used to debug the c2c framework for finite workloads.
debugRunDurationMinutes int

// initDuration, if nonzero, will pre-populate the src cluster
initDurationMinutes int
}

func (kv replicateKV) sourceInitCmd(tenantName string, nodes option.NodeListOption) string {
return ""
if kv.initDurationMinutes == 0 {
return ""
}
return fmt.Sprintf(`./workload run kv --tolerate-errors --init --duration %dm --read-percent 0 {pgurl%s:%s}`,
kv.initDurationMinutes,
nodes,
tenantName)
}

func (kv replicateKV) sourceRunCmd(tenantName string, nodes option.NodeListOption) string {
Expand Down Expand Up @@ -794,15 +804,15 @@ type c2cPhase int

const (
phaseInitialScan c2cPhase = iota
steadyState
phaseSteadyState
phaseCutover
)

func (c c2cPhase) String() string {
switch c {
case phaseInitialScan:
return "Initial Scan"
case steadyState:
case phaseSteadyState:
return "Steady State"
case phaseCutover:
return "Cutover"
Expand Down Expand Up @@ -858,8 +868,8 @@ func makeReplResilienceDriver(
rd := makeReplicationDriver(t, c, rsp.replicationSpec)
return replResilienceDriver{
replicationDriver: rd,
// TODO(msbutler): randomly select a state to shutdown in.
phase: steadyState,
phase: c2cPhase(rand.Intn(int(phaseCutover) + 1)),
rsp: rsp,
}
}

Expand Down Expand Up @@ -937,9 +947,8 @@ func (rrd *replResilienceDriver) getPhase() c2cPhase {
return phaseInitialScan
}
if streamIngestProgress.CutoverTime.IsEmpty() {
return steadyState
return phaseSteadyState
}
// TODO check that job has not complete
return phaseCutover
}

Expand All @@ -951,6 +960,12 @@ func (rrd *replResilienceDriver) waitForTargetPhase() error {
case currentPhase < rrd.phase:
time.Sleep(5 * time.Second)
case currentPhase == rrd.phase:
// Every C2C phase should last at least 30 seconds, so introduce a little
// bit of random waiting before node shutdown to ensure the shutdown occurs
// once we're settled into the target phase.
randomSleep := time.Duration(rand.Intn(6))
rrd.t.L().Printf("In target phase! Take a %d second power nap", randomSleep)
time.Sleep(randomSleep * time.Second)
return nil
default:
return errors.New("c2c job past target phase")
Expand Down Expand Up @@ -984,10 +999,10 @@ func registerClusterReplicationResilience(r registry.Registry) {
srcNodes: 4,
dstNodes: 4,
cpus: 8,
workload: replicateKV{readPercent: 0},
workload: replicateKV{readPercent: 0, initDurationMinutes: 2},
timeout: 20 * time.Minute,
additionalDuration: 5 * time.Minute,
cutover: 4 * time.Minute,
additionalDuration: 6 * time.Minute,
cutover: 3 * time.Minute,
expectedNodeDeaths: 1,
}

Expand Down Expand Up @@ -1027,6 +1042,8 @@ func registerClusterReplicationResilience(r registry.Registry) {
rd.setup.dst.db = watcherDB
rd.setup.dst.sysSQL = watcherSQL
}
t.L().Printf(`%s configured: Shutdown Node %d; Watcher node %d; Gateway nodes %s`,
rrd.rsp.name(), rrd.shutdownNode, rrd.watcherNode, rrd.setup.gatewayNodes)
}
m := rrd.newMonitor(ctx)
m.Go(func(ctx context.Context) error {
Expand Down

0 comments on commit f13081c

Please sign in to comment.