Skip to content

Commit

Permalink
c2c: deflake c2c/shutdown roachtests
Browse files Browse the repository at this point in the history
This patch addresses to roachtest failure modes:
- Prevents roachtest failure if a query fails during a node shutdown.

- Prevents the src cluster from returning a single node topology, which could
  cause the stream ingestion job to hang if the participating src node gets
shut down. Longer term, automatic replanning will prevent this. Specifically,
this patch changes the kv workload to split and scatter the kv table across the
cluster before the c2c job begins.

Fixes cockroachdb#101898
Fixes cockroachdb#102111

This patch also makes it easier to reproduce c2c roachtest failures by plumbing
a random seed to several components of the roachtest driver.

Release note: None
msbutler committed May 12, 2023

Verified

This commit was signed with the committer’s verified signature.
tombruijn Tom de Bruijn
1 parent 5379b10 commit 9813270
Showing 3 changed files with 95 additions and 50 deletions.
5 changes: 5 additions & 0 deletions pkg/cmd/roachtest/option/node_list_option.go
Original file line number Diff line number Diff line change
@@ -73,6 +73,11 @@ func (n NodeListOption) RandNode() NodeListOption {
return NodeListOption{n[rand.Intn(len(n))]}
}

// SeededRandNode returns a random node from the NodeListOption using a seeded rand object.
func (n NodeListOption) SeededRandNode(rand *rand.Rand) NodeListOption {
return NodeListOption{n[rand.Intn(len(n))]}
}

// NodeIDsString returns the nodes in the NodeListOption, separated by spaces.
func (n NodeListOption) NodeIDsString() string {
result := ""
138 changes: 88 additions & 50 deletions pkg/cmd/roachtest/tests/cluster_to_cluster.go
Original file line number Diff line number Diff line change
@@ -34,11 +34,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/multitenant/mtinfopb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/roachprod/prometheus"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
@@ -228,33 +230,33 @@ type replicateKV struct {
readPercent int

// This field is merely used to debug the c2c framework for finite workloads.
debugRunDurationMinutes int
debugRunDuration time.Duration

// initDuration, if nonzero, will pre-populate the src cluster
initDurationMinutes int
// the number of rows inserted into the cluster before c2c begins
initRows int

// max size of raw data written during each insertion
maxBlockBytes int
}

func (kv replicateKV) sourceInitCmd(tenantName string, nodes option.NodeListOption) string {
if kv.initDurationMinutes == 0 {
return ""
}
return fmt.Sprintf(`./workload run kv --tolerate-errors --init --duration %dm --read-percent 0 {pgurl%s:%s}`,
kv.initDurationMinutes,
nodes,
tenantName)
cmd := roachtestutil.NewCommand(`./workload init kv`).
MaybeFlag(kv.initRows > 0, "insert-count", kv.initRows).
MaybeFlag(kv.initRows > 0, "max-block-bytes", kv.maxBlockBytes).
Flag("splits", 100).
Option("scatter").
Arg("{pgurl%s:%s}", nodes, tenantName)
return cmd.String()
}

func (kv replicateKV) sourceRunCmd(tenantName string, nodes option.NodeListOption) string {
debugDuration := ""
if kv.debugRunDurationMinutes != 0 {
debugDuration = fmt.Sprintf("--duration %dm", kv.debugRunDurationMinutes)
}
// added --tolerate-errors flags to prevent test from flaking due to a transaction retry error
return fmt.Sprintf(`./workload run kv --tolerate-errors --init %s --read-percent %d {pgurl%s:%s}`,
debugDuration,
kv.readPercent,
nodes,
tenantName)
cmd := roachtestutil.NewCommand(`./workload run kv`).
Option("tolerate-errors").
Flag("max-block-bytes", kv.maxBlockBytes).
Flag("read-percent", kv.readPercent).
MaybeFlag(kv.debugRunDuration > 0, "duration", kv.debugRunDuration).
Arg("{pgurl%s:%s}", nodes, tenantName)
return cmd.String()
}

func (kv replicateKV) runDriver(
@@ -335,13 +337,17 @@ type replicationDriver struct {
t test.Test
c cluster.Cluster
metrics *c2cMetrics
rng *rand.Rand
}

func makeReplicationDriver(t test.Test, c cluster.Cluster, rs replicationSpec) replicationDriver {
rng, seed := randutil.NewTestRand()
t.L().Printf(`Random Seed is %d`, seed)
return replicationDriver{
t: t,
c: c,
rs: rs,
t: t,
c: c,
rs: rs,
rng: rng,
}
}

@@ -364,8 +370,8 @@ func (rd *replicationDriver) setupC2C(ctx context.Context, t test.Test, c cluste
dstClusterSetting := install.MakeClusterSettings(install.SecureOption(true))
c.Start(ctx, t.L(), dstStartOps, dstClusterSetting, dstCluster)

srcNode := srcCluster.RandNode()
destNode := dstCluster.RandNode()
srcNode := srcCluster.SeededRandNode(rd.rng)
destNode := dstCluster.SeededRandNode(rd.rng)

addr, err := c.ExternalPGUrl(ctx, t.L(), srcNode, "")
require.NoError(t, err)
@@ -425,7 +431,12 @@ func (rd *replicationDriver) setupC2C(ctx context.Context, t test.Test, c cluste
WithNodeExporter(rd.setup.dst.nodes.InstallNodes()).
WithGrafanaDashboard("https://go.crdb.dev/p/changefeed-roachtest-grafana-dashboard")

require.NoError(rd.t, rd.c.StartGrafana(ctx, rd.t.L(), rd.setup.promCfg))
// StartGrafana clutters the test.log. Try logging setup to a separate file.
promLog, err := rd.t.L().ChildLogger("prom_setup", logger.QuietStderr, logger.QuietStdout)
if err != nil {
promLog = rd.t.L()
}
require.NoError(rd.t, rd.c.StartGrafana(ctx, promLog, rd.setup.promCfg))
rd.t.L().Printf("Prom has started")
}
}
@@ -524,13 +535,20 @@ func (rd *replicationDriver) getReplicationRetainedTime() time.Time {
return retainedTime
}

func (rd *replicationDriver) stopReplicationStream(ingestionJob int, cutoverTime time.Time) {
func (rd *replicationDriver) stopReplicationStream(
ctx context.Context, ingestionJob int, cutoverTime time.Time,
) {
rd.setup.dst.sysSQL.Exec(rd.t, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, rd.setup.dst.name, cutoverTime)
err := retry.ForDuration(time.Minute*5, func() error {
var status string
var payloadBytes []byte
rd.setup.dst.sysSQL.QueryRow(rd.t, `SELECT status, payload FROM crdb_internal.system_jobs WHERE id = $1`,
ingestionJob).Scan(&status, &payloadBytes)
res := rd.setup.dst.sysSQL.DB.QueryRowContext(ctx, `SELECT status, payload FROM crdb_internal.system_jobs WHERE id = $1`, ingestionJob)
if res.Err() != nil {
// This query can fail if a node shuts down during the query execution;
// therefore, tolerate errors.
return res.Err()
}
require.NoError(rd.t, res.Scan(&status, &payloadBytes))
if jobs.Status(status) == jobs.StatusFailed {
payload := &jobspb.Payload{}
if err := protoutil.Unmarshal(payloadBytes, payload); err == nil {
@@ -590,6 +608,13 @@ func (rd *replicationDriver) main(ctx context.Context) {
metricSnapper := rd.startStatsCollection(ctx)
rd.preStreamingWorkload(ctx)

// Wait for initial workload to be properly replicated across the source cluster to increase
// the probability that the producer returns a topology with more than one node in it,
// else the node shutdown tests can flake.
if rd.rs.srcNodes >= 3 {
require.NoError(rd.t, WaitFor3XReplication(ctx, rd.t, rd.setup.src.db))
}

rd.t.L().Printf("begin workload on src cluster")
m := rd.newMonitor(ctx)
// The roachtest driver can use the workloadCtx to cancel the workload.
@@ -614,10 +639,12 @@ func (rd *replicationDriver) main(ctx context.Context) {
rd.t.Status("starting replication stream")
rd.metrics.initalScanStart = newMetricSnapshot(metricSnapper, timeutil.Now())
ingestionJobID := rd.startReplicationStream(ctx)

removeTenantRateLimiters(rd.t, rd.setup.dst.sysSQL, rd.setup.dst.name)

lv := makeLatencyVerifier("stream-ingestion", 0, 2*time.Minute, rd.t.L(), getStreamIngestionJobInfo, rd.t.Status, false)
// latency verifier queries may error during a node shutdown event; therefore
// tolerate errors if we anticipate node deaths.
lv := makeLatencyVerifier("stream-ingestion", 0, 2*time.Minute, rd.t.L(),
getStreamIngestionJobInfo, rd.t.Status, rd.rs.expectedNodeDeaths > 0)
defer lv.maybeLogLatencyHist()

m.Go(func(ctx context.Context) error {
@@ -652,7 +679,7 @@ func (rd *replicationDriver) main(ctx context.Context) {

rd.t.Status(fmt.Sprintf("waiting for replication stream to cutover to %s",
cutoverTime.String()))
rd.stopReplicationStream(ingestionJobID, cutoverTime)
rd.stopReplicationStream(ctx, ingestionJobID, cutoverTime)
rd.metrics.cutoverEnd = newMetricSnapshot(metricSnapper, timeutil.Now())

rd.metrics.export(rd.t, len(rd.setup.src.nodes))
@@ -700,7 +727,7 @@ func runAcceptanceClusterReplication(ctx context.Context, t test.Test, c cluster
dstNodes: 1,
// The timeout field ensures the c2c roachtest driver behaves properly.
timeout: 10 * time.Minute,
workload: replicateKV{readPercent: 0, debugRunDurationMinutes: 1},
workload: replicateKV{readPercent: 0, debugRunDuration: 1 * time.Minute, maxBlockBytes: 1},
additionalDuration: 0 * time.Minute,
cutover: 30 * time.Second,
}
@@ -747,18 +774,19 @@ func registerClusterToCluster(r registry.Registry) {
dstNodes: 3,
cpus: 8,
pdSize: 100,
workload: replicateKV{readPercent: 0},
workload: replicateKV{readPercent: 0, maxBlockBytes: 1024},
timeout: 1 * time.Hour,
additionalDuration: 10 * time.Minute,
cutover: 5 * time.Minute,
},
{
name: "c2c/UnitTest",
srcNodes: 1,
dstNodes: 1,
cpus: 4,
pdSize: 10,
workload: replicateKV{readPercent: 0, debugRunDurationMinutes: 1},
name: "c2c/UnitTest",
srcNodes: 1,
dstNodes: 1,
cpus: 4,
pdSize: 10,
workload: replicateKV{readPercent: 0, debugRunDuration: 1 * time.Minute,
maxBlockBytes: 1024},
timeout: 5 * time.Minute,
additionalDuration: 0 * time.Minute,
cutover: 30 * time.Second,
@@ -864,7 +892,7 @@ func makeReplResilienceDriver(
rd := makeReplicationDriver(t, c, rsp.replicationSpec)
return replResilienceDriver{
replicationDriver: rd,
phase: c2cPhase(rand.Intn(int(phaseCutover) + 1)),
phase: c2cPhase(rd.rng.Intn(int(phaseCutover) + 1)),
rsp: rsp,
}
}
@@ -919,7 +947,7 @@ func (rrd *replResilienceDriver) getTargetAndWatcherNodes(ctx context.Context) {

findAnotherNode := func(notThisNode int) int {
for {
anotherNode := nodes.RandNode()[0]
anotherNode := nodes.SeededRandNode(rrd.rng)[0]
if notThisNode != anotherNode {
return anotherNode
}
@@ -956,19 +984,24 @@ func (rrd *replResilienceDriver) waitForTargetPhase() error {
case currentPhase < rrd.phase:
time.Sleep(5 * time.Second)
case currentPhase == rrd.phase:
// Every C2C phase should last at least 30 seconds, so introduce a little
// bit of random waiting before node shutdown to ensure the shutdown occurs
// once we're settled into the target phase.
randomSleep := time.Duration(rand.Intn(6))
rrd.t.L().Printf("In target phase! Take a %d second power nap", randomSleep)
time.Sleep(randomSleep * time.Second)
rrd.t.L().Printf("In target phase %s", currentPhase.String())
return nil
default:
return errors.New("c2c job past target phase")
}
}
}

func (rrd *replResilienceDriver) sleepBeforeResiliencyEvent() {
// Assuming every C2C phase lasts at least 10 seconds, introduce some waiting
// before a resiliency event (e.g. a node shutdown) to ensure the event occurs
// once we're fully settled into the target phase (e.g. the stream ingestion
// processors have observed the cutover signal).
randomSleep := time.Duration(5+rrd.rng.Intn(6)) * time.Second
rrd.t.L().Printf("Take a %s power nap", randomSleep)
time.Sleep(randomSleep)
}

func registerClusterReplicationResilience(r registry.Registry) {
for _, rsp := range []replResilienceSpec{
{
@@ -995,7 +1028,7 @@ func registerClusterReplicationResilience(r registry.Registry) {
srcNodes: 4,
dstNodes: 4,
cpus: 8,
workload: replicateKV{readPercent: 0, initDurationMinutes: 2},
workload: replicateKV{readPercent: 0, initRows: 1000000, maxBlockBytes: 1024},
timeout: 20 * time.Minute,
additionalDuration: 6 * time.Minute,
cutover: 3 * time.Minute,
@@ -1050,14 +1083,19 @@ func registerClusterReplicationResilience(r registry.Registry) {
// Don't begin shutdown process until c2c job is set up.
<-shutdownSetupDone

// Eagerly listen to cutover signal to exercise node shutdown during actual cutover.
rrd.setup.dst.sysSQL.Exec(t, `SET CLUSTER SETTING bulkio.stream_ingestion.cutover_signal_poll_interval='5s'`)

// While executing a node shutdown on either the src or destination
// cluster, ensure the destination cluster's stream ingestion job
// completes. If the stream producer job fails, no big deal-- in a real
// DR scenario the src cluster may have gone belly up during a
// successful c2c replication execution.
shutdownStarter := func() jobStarter {
return func(c cluster.Cluster, t test.Test) (string, error) {
return fmt.Sprintf("%d", rrd.dstJobID), rrd.waitForTargetPhase()
require.NoError(t, rrd.waitForTargetPhase())
rrd.sleepBeforeResiliencyEvent()
return fmt.Sprintf("%d", rrd.dstJobID), nil
}
}
destinationWatcherNode := rrd.watcherNode
2 changes: 2 additions & 0 deletions pkg/cmd/roachtest/tests/multitenant_utils.go
Original file line number Diff line number Diff line change
@@ -338,6 +338,8 @@ func createInMemoryTenant(
sysSQL.Exec(t, "CREATE TENANT $1", tenantName)
sysSQL.Exec(t, "ALTER TENANT $1 START SERVICE SHARED", tenantName)
sysSQL.Exec(t, `ALTER TENANT $1 GRANT CAPABILITY can_view_node_info=true, can_admin_split=true,can_view_tsdb_metrics=true`, tenantName)
sysSQL.Exec(t, `ALTER TENANT $1 SET CLUSTER SETTING sql.split_at.allow_for_secondary_tenant.enabled=true`, tenantName)
sysSQL.Exec(t, `ALTER TENANT $1 SET CLUSTER SETTING sql.scatter.allow_for_secondary_tenant.enabled=true`, tenantName)

removeTenantRateLimiters(t, sysSQL, tenantName)

0 comments on commit 9813270

Please sign in to comment.