Skip to content


c2c: increase c2c roachtest workload flexibility
Browse files Browse the repository at this point in the history
Previously in c2c roachtests, the foreground workload on the src cluster would
run for a predefined amount of time, based on the expected initial scan time.
But, if this estimated initial scan time wasn't accurate, the roachtest would
not properly simulate c2c customer workload. E.g. if the initial scan actually
took much longer than expected, the workload would finish before the initial

This patch removes the need to specify a duration for the src cluster workload.
Instead, the goroutine running the workload will get cancelled at cutover time,
determined by the `replicationTestSpec.additionalDuration` field, which
specifies how long the workload should after the initial scan completes.

This patch also adds additional logging which provides instructions for opening
a sql session to the tenant and opening a tenant's dbconsole.

Informs cockroachdb#89176

Release note: None
  • Loading branch information
msbutler committed Jan 19, 2023
1 parent 1d2ba22 commit 4cf7a4f
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 54 deletions.
142 changes: 88 additions & 54 deletions pkg/cmd/roachtest/tests/cluster_to_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,14 @@ func newMetric(metric float64, unit string) exportedMetric {
return exportedMetric{metric, unit}

func (em exportedMetric) String() string {
func (em exportedMetric) StringWithUnits() string {
return fmt.Sprintf("%.2f %s", em.metric, em.unit)

func (em exportedMetric) String() string {
return fmt.Sprintf("%.2f", em.metric)

// sizeTime captures the disk size of the nodes at some moment in time
type sizeTime struct {
// size is the megabytes of the objects
Expand Down Expand Up @@ -236,6 +240,7 @@ func setupC2C(
srcTenant.stop(ctx, t, c)

srcTenantInfo := clusterInfo{
name: srcTenantName,
ID: srcTenantID,
Expand All @@ -258,8 +263,19 @@ func setupC2C(
srcTenantInfo.sql.Exec(t, `SELECT crdb_internal.update_tenant_resource_limits($1, 10000000000, 0,
10000000000, now(), 0);`, srcTenantInfo.ID)

createSystemRole(t,, srcTenantInfo.sql)
createSystemRole(t,, destTenantInfo.sql)
createSystemRole(t," system tenant", srcTenantInfo.sql)
createSystemRole(t," system tenant", destTenantInfo.sql)

tenantConn, err := srcTenant.conn()
require.NoError(t, err)
createSystemRole(t," app tenant", sqlutils.MakeSQLRunner(tenantConn))
t.L().Printf(`To open a sql session on the app tenant, ssh to the tenant node and run:
./cockroach sql url="%s"`, srcTenant.secureURL())
t.L().Printf(`To open the app tenant's db console, run:
1. roachprod adminui $CLUSTER:%d
2. change the port in the url to %d
`, srcTenantNode, tenantHTTPPort)

return &c2cSetup{
src: srcTenantInfo,
dst: destTenantInfo,
Expand All @@ -272,7 +288,7 @@ func createSystemRole(t test.Test, name string, sql *sqlutils.SQLRunner) {
password := "roach"
sql.Exec(t, fmt.Sprintf(`CREATE ROLE %s WITH LOGIN PASSWORD '%s'`, username, password))
sql.Exec(t, fmt.Sprintf(`GRANT ADMIN TO %s`, username))
t.L().Printf(`Log into the %s system tenant db console with username "%s" and password "%s"`,
t.L().Printf(`Log into the %s db console with username "%s" and password "%s"`,
name, username, password)

Expand All @@ -281,9 +297,8 @@ type streamingWorkload interface {
// replication stream begins
sourceInitCmd(pgURL string) string

// sourceRunCmd returns a command that will run a workload for the given duration on the src
// cluster during the replication stream.
sourceRunCmd(pgURL string, duration time.Duration) string
// sourceRunCmd returns a command that will run a workload
sourceRunCmd(pgURL string) string

type replicateTPCC struct {
Expand All @@ -295,10 +310,10 @@ func (tpcc replicateTPCC) sourceInitCmd(pgURL string) string {
tpcc.warehouses, pgURL)

func (tpcc replicateTPCC) sourceRunCmd(pgURL string, duration time.Duration) string {
func (tpcc replicateTPCC) sourceRunCmd(pgURL string) string {
// added --tolerate-errors flags to prevent test from flaking due to a transaction retry error
return fmt.Sprintf(`./workload run tpcc --warehouses %d --duration %dm --tolerate-errors '%s'`,
tpcc.warehouses, int(duration.Minutes()), pgURL)
return fmt.Sprintf(`./workload run tpcc --warehouses %d --tolerate-errors '%s'`,
tpcc.warehouses, pgURL)

type replicateKV struct {
Expand All @@ -309,10 +324,9 @@ func (kv replicateKV) sourceInitCmd(pgURL string) string {
return ""

func (kv replicateKV) sourceRunCmd(pgURL string, duration time.Duration) string {
func (kv replicateKV) sourceRunCmd(pgURL string) string {
// added --tolerate-errors flags to prevent test from flaking due to a transaction retry error
return fmt.Sprintf(`./workload run kv --tolerate-errors --init --duration %dm --read-percent %d '%s'`,
return fmt.Sprintf(`./workload run kv --tolerate-errors --init --read-percent %d '%s'`,
Expand Down Expand Up @@ -349,7 +363,7 @@ type replicationTestSpec struct {
func registerClusterToCluster(r registry.Registry) {
for _, sp := range []replicationTestSpec{
name: "c2c/tpcc",
name: "c2c/tpcc/warehouses=500/duration=10/cutover=5",
srcKVNodes: 4,
dstKVNodes: 4,
cpus: 8,
Expand All @@ -362,12 +376,13 @@ func registerClusterToCluster(r registry.Registry) {
timeout: 1 * time.Hour,
additionalDuration: 10 * time.Minute,
cutover: 5 * time.Minute,
}, {
name: "c2c/kv0",
srcKVNodes: 3,
dstKVNodes: 3,
cpus: 8,
pdSize: 1000,
pdSize: 100,
workload: replicateKV{readPercent: 0},
timeout: 1 * time.Hour,
additionalDuration: 10 * time.Minute,
Expand All @@ -385,11 +400,16 @@ func registerClusterToCluster(r registry.Registry) {
Owner: registry.OwnerDisasterRecovery,
Cluster: r.MakeClusterSpec(sp.dstKVNodes+sp.srcKVNodes+1, clusterOps...),
Timeout: sp.timeout,
RequiresLicense: true,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {

setup, cleanup := setupC2C(ctx, t, c, sp.srcKVNodes, sp.dstKVNodes)
defer cleanup()

t.L().Printf("tenant secureURL: %s; http port: %s; sql port: %s",
setup.src.tenant.secureURL(), setup.src.tenant.httpPort, setup.src.tenant.sqlPort)

m := c.NewMonitor(ctx, setup.src.kvNodes.Merge(setup.dst.kvNodes))
du, err := NewDiskUsageTracker(c, t.L())
require.NoError(t, err)
Expand All @@ -401,33 +421,33 @@ func registerClusterToCluster(r registry.Registry) {
setup.metrics.initialScanEnd = newSizeTime(ctx, du, setup.src.kvNodes)

initDuration = setup.metrics.initialScanEnd.time.Sub(setup.metrics.start.time)
t.L().Printf("src cluster workload initialization took %d minutes", int(initDuration.Minutes()))
t.L().Printf("src cluster workload initialization took %s minutes", initDuration)

t.Status("starting replication stream")
streamReplStmt := fmt.Sprintf("CREATE TENANT %q FROM REPLICATION OF %q ON '%s'",,, setup.src.pgURL)
setup.dst.sql.Exec(t, streamReplStmt)
ingestionJobID := getIngestionJobID(t, setup.dst.sql,
t.L().Printf("begin workload on src cluster")
workloadCtx, workloadCancel := context.WithCancel(ctx)
defer workloadCancel()

// The replication stream is expected to spend some time conducting an
// initial scan, ideally on the same order as the `initDuration`, the
// time taken to initially populate the source cluster. To ensure the
// latency verifier stabilizes, ensure the workload and the replication
// stream run for a significant amount of time after the initial scan
// ends. Explicitly, set the workload to run for the estimated initial scan
// runtime + the user specified workload duration.
workloadDuration := initDuration + sp.additionalDuration
workloadDoneCh := make(chan struct{})
m.Go(func(ctx context.Context) error {
defer close(workloadDoneCh)
cmd := sp.workload.sourceRunCmd(setup.src.tenant.secureURL(), workloadDuration)
c.Run(ctx, c.Node(setup.src.sqlNode), cmd)
err := c.RunE(workloadCtx, c.Node(setup.src.sqlNode),
// The workload should only stop if the workloadCtx is cancelled once
// sp.additionalDuration has elapsed after the initial scan completes.
if workloadCtx.Err() == nil {
// Implies the workload context was not cancelled and the workload cmd returned on
// its own.
return errors.Wrapf(err, `Workload context was not cancelled. Error returned by workload cmd`)
workloadDoneCh <- struct{}{}
return nil

cutoverTime := chooseCutover(t, setup.dst.sql, workloadDuration, sp.cutover)
t.Status("cutover time chosen: %s", cutoverTime.String())
t.Status("starting replication stream")
streamReplStmt := fmt.Sprintf("CREATE TENANT %q FROM REPLICATION OF %q ON '%s'",,, setup.src.pgURL)
setup.dst.sql.Exec(t, streamReplStmt)
ingestionJobID := getIngestionJobID(t, setup.dst.sql,

// TODO(ssd): The job doesn't record the initial
// statement time, so we can't correctly measure the
Expand All @@ -439,17 +459,45 @@ func registerClusterToCluster(r registry.Registry) {
return lv.pollLatency(ctx, setup.dst.db, ingestionJobID, time.Second, workloadDoneCh)

t.Status("waiting for replication stream to finish ingesting initial scan")
t.L().Printf("waiting for replication stream to finish ingesting initial scan")
waitForHighWatermark(t, setup.dst.db, ingestionJobID, sp.timeout/2)

t.Status("waiting for src cluster workload to complete")

t.Status("waiting for replication stream to cutover")
t.Status(fmt.Sprintf(`initial scan complete. run workload and repl. stream for another %s minutes`,

var currentTime time.Time
setup.dst.sql.QueryRow(t, "SELECT clock_timestamp()").Scan(&currentTime)
cutoverTime := currentTime.Add(sp.additionalDuration - sp.cutover)
t.Status("cutover time chosen: ", cutoverTime.String())

select {
case <-time.After(sp.additionalDuration):
t.L().Printf("workload has finished after %s", sp.additionalDuration)
case <-ctx.Done():
t.L().Printf(`roachtest context cancelled while waiting for workload duration to complete`)
t.Status(fmt.Sprintf("waiting for replication stream to cutover to %s", cutoverTime.String()))
setup.metrics.cutoverStart = newSizeTime(ctx, du, setup.dst.kvNodes)
stopReplicationStream(t, setup.dst.sql, ingestionJobID, cutoverTime)
setup.metrics.cutoverEnd = newSizeTime(ctx, du, setup.dst.kvNodes)

// TODO(msbutler): export metrics to roachperf or prom/grafana
exportedMetrics := setup.metrics.export()
t.L().Printf(`Initial Scan: Duration, Size, Throughput; Cutover: Duration, Size, Throughput`)
t.L().Printf(`%s %s %s %s %s %s`,
for key, metric := range exportedMetrics {
t.L().Printf("%s: %s", key, metric.StringWithUnits())

t.Status("comparing fingerprints")
// Currently, it takes about 15 minutes to generate a fingerprint for
// about 30 GB of data. Once the fingerprinting job is used instead,
Expand All @@ -464,12 +512,6 @@ func registerClusterToCluster(r registry.Registry) {
hlc.Timestamp{WallTime: cutoverTime.UnixNano()})

// TODO(msbutler): export metrics to roachperf or prom/grafana
exportedMetrics := setup.metrics.export()
for key, metric := range exportedMetrics {
t.L().Printf("%s: %s", key, metric.String())
Expand All @@ -483,14 +525,6 @@ func getIngestionJobID(t test.Test, dstSQL *sqlutils.SQLRunner, dstTenantName st
return int(tenantInfo.TenantReplicationJobID)

func chooseCutover(
t test.Test, dstSQL *sqlutils.SQLRunner, workloadDuration time.Duration, cutover time.Duration,
) time.Time {
var currentTime time.Time
dstSQL.QueryRow(t, "SELECT clock_timestamp()").Scan(&currentTime)
return currentTime.Add(workloadDuration - cutover)

func compareTenantFingerprintsAtTimestamp(
t test.Test, m cluster.Monitor, setup *c2cSetup, ts hlc.Timestamp,
) {
Expand Down
5 changes: 5 additions & 0 deletions pkg/cmd/roachtest/tests/multitenant_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,11 @@ func (tn *tenantNode) start(ctx context.Context, t test.Test, c cluster.Cluster,
t.L().Printf("sql server for tenant %d (instance %d) now running", tn.tenantID, tn.instanceID)

// conn returns a sql connection to the tenant
func (tn *tenantNode) conn() (*gosql.DB, error) {
return gosql.Open("postgres", tn.pgURL)

func startTenantServer(
tenantCtx context.Context,
c cluster.Cluster,
Expand Down

0 comments on commit 4cf7a4f

Please sign in to comment.