diff --git a/pkg/cmd/roachtest/tests/cluster_to_cluster.go b/pkg/cmd/roachtest/tests/cluster_to_cluster.go index 3d1a022213f1..b9f2156f8d32 100644 --- a/pkg/cmd/roachtest/tests/cluster_to_cluster.go +++ b/pkg/cmd/roachtest/tests/cluster_to_cluster.go @@ -122,10 +122,14 @@ func newMetric(metric float64, unit string) exportedMetric { return exportedMetric{metric, unit} } -func (em exportedMetric) String() string { +func (em exportedMetric) StringWithUnits() string { return fmt.Sprintf("%.2f %s", em.metric, em.unit) } +func (em exportedMetric) String() string { + return fmt.Sprintf("%.2f", em.metric) +} + // sizeTime captures the disk size of the nodes at some moment in time type sizeTime struct { // size is the megabytes of the objects @@ -236,6 +240,7 @@ func setupC2C( srcTenant.stop(ctx, t, c) pgCleanup() } + srcTenantInfo := clusterInfo{ name: srcTenantName, ID: srcTenantID, @@ -258,8 +263,19 @@ func setupC2C( srcTenantInfo.sql.Exec(t, `SELECT crdb_internal.update_tenant_resource_limits($1, 10000000000, 0, 10000000000, now(), 0);`, srcTenantInfo.ID) - createSystemRole(t, srcTenantInfo.name, srcTenantInfo.sql) - createSystemRole(t, destTenantInfo.name, destTenantInfo.sql) + createSystemRole(t, srcTenantInfo.name+" system tenant", srcTenantInfo.sql) + createSystemRole(t, destTenantInfo.name+" system tenant", destTenantInfo.sql) + + tenantConn, err := srcTenant.conn() + require.NoError(t, err) + createSystemRole(t, srcTenantInfo.name+" app tenant", sqlutils.MakeSQLRunner(tenantConn)) + t.L().Printf(`To open a sql session on the app tenant, ssh to the tenant node and run: + ./cockroach sql url="%s"`, srcTenant.secureURL()) + t.L().Printf(`To open the app tenant's db console, run: + 1. roachprod adminui $CLUSTER:%d + 2. change the port in the url to %d +`, srcTenantNode, tenantHTTPPort) + return &c2cSetup{ src: srcTenantInfo, dst: destTenantInfo, @@ -272,7 +288,7 @@ func createSystemRole(t test.Test, name string, sql *sqlutils.SQLRunner) { password := "roach" sql.Exec(t, fmt.Sprintf(`CREATE ROLE %s WITH LOGIN PASSWORD '%s'`, username, password)) sql.Exec(t, fmt.Sprintf(`GRANT ADMIN TO %s`, username)) - t.L().Printf(`Log into the %s system tenant db console with username "%s" and password "%s"`, + t.L().Printf(`Log into the %s db console with username "%s" and password "%s"`, name, username, password) } @@ -281,9 +297,8 @@ type streamingWorkload interface { // replication stream begins sourceInitCmd(pgURL string) string - // sourceRunCmd returns a command that will run a workload for the given duration on the src - // cluster during the replication stream. - sourceRunCmd(pgURL string, duration time.Duration) string + // sourceRunCmd returns a command that will run a workload + sourceRunCmd(pgURL string) string } type replicateTPCC struct { @@ -295,10 +310,10 @@ func (tpcc replicateTPCC) sourceInitCmd(pgURL string) string { tpcc.warehouses, pgURL) } -func (tpcc replicateTPCC) sourceRunCmd(pgURL string, duration time.Duration) string { +func (tpcc replicateTPCC) sourceRunCmd(pgURL string) string { // added --tolerate-errors flags to prevent test from flaking due to a transaction retry error - return fmt.Sprintf(`./workload run tpcc --warehouses %d --duration %dm --tolerate-errors '%s'`, - tpcc.warehouses, int(duration.Minutes()), pgURL) + return fmt.Sprintf(`./workload run tpcc --warehouses %d --tolerate-errors '%s'`, + tpcc.warehouses, pgURL) } type replicateKV struct { @@ -309,10 +324,9 @@ func (kv replicateKV) sourceInitCmd(pgURL string) string { return "" } -func (kv replicateKV) sourceRunCmd(pgURL string, duration time.Duration) string { +func (kv replicateKV) sourceRunCmd(pgURL string) string { // added --tolerate-errors flags to prevent test from flaking due to a transaction retry error - return fmt.Sprintf(`./workload run kv --tolerate-errors --init --duration %dm --read-percent %d '%s'`, - int(duration.Minutes()), + return fmt.Sprintf(`./workload run kv --tolerate-errors --init --read-percent %d '%s'`, kv.readPercent, pgURL) } @@ -349,7 +363,7 @@ type replicationTestSpec struct { func registerClusterToCluster(r registry.Registry) { for _, sp := range []replicationTestSpec{ { - name: "c2c/tpcc", + name: "c2c/tpcc/warehouses=500/duration=10/cutover=5", srcKVNodes: 4, dstKVNodes: 4, cpus: 8, @@ -362,12 +376,13 @@ func registerClusterToCluster(r registry.Registry) { timeout: 1 * time.Hour, additionalDuration: 10 * time.Minute, cutover: 5 * time.Minute, - }, { + }, + { name: "c2c/kv0", srcKVNodes: 3, dstKVNodes: 3, cpus: 8, - pdSize: 1000, + pdSize: 100, workload: replicateKV{readPercent: 0}, timeout: 1 * time.Hour, additionalDuration: 10 * time.Minute, @@ -385,11 +400,16 @@ func registerClusterToCluster(r registry.Registry) { Name: sp.name, Owner: registry.OwnerDisasterRecovery, Cluster: r.MakeClusterSpec(sp.dstKVNodes+sp.srcKVNodes+1, clusterOps...), + Timeout: sp.timeout, RequiresLicense: true, Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { setup, cleanup := setupC2C(ctx, t, c, sp.srcKVNodes, sp.dstKVNodes) defer cleanup() + + t.L().Printf("tenant secureURL: %s; http port: %s; sql port: %s", + setup.src.tenant.secureURL(), setup.src.tenant.httpPort, setup.src.tenant.sqlPort) + m := c.NewMonitor(ctx, setup.src.kvNodes.Merge(setup.dst.kvNodes)) du, err := NewDiskUsageTracker(c, t.L()) require.NoError(t, err) @@ -401,33 +421,33 @@ func registerClusterToCluster(r registry.Registry) { setup.metrics.initialScanEnd = newSizeTime(ctx, du, setup.src.kvNodes) initDuration = setup.metrics.initialScanEnd.time.Sub(setup.metrics.start.time) - t.L().Printf("src cluster workload initialization took %d minutes", int(initDuration.Minutes())) + t.L().Printf("src cluster workload initialization took %s minutes", initDuration) } - t.Status("starting replication stream") - streamReplStmt := fmt.Sprintf("CREATE TENANT %q FROM REPLICATION OF %q ON '%s'", - setup.dst.name, setup.src.name, setup.src.pgURL) - setup.dst.sql.Exec(t, streamReplStmt) - ingestionJobID := getIngestionJobID(t, setup.dst.sql, setup.dst.name) + t.L().Printf("begin workload on src cluster") + workloadCtx, workloadCancel := context.WithCancel(ctx) + defer workloadCancel() - // The replication stream is expected to spend some time conducting an - // initial scan, ideally on the same order as the `initDuration`, the - // time taken to initially populate the source cluster. To ensure the - // latency verifier stabilizes, ensure the workload and the replication - // stream run for a significant amount of time after the initial scan - // ends. Explicitly, set the workload to run for the estimated initial scan - // runtime + the user specified workload duration. - workloadDuration := initDuration + sp.additionalDuration workloadDoneCh := make(chan struct{}) m.Go(func(ctx context.Context) error { - defer close(workloadDoneCh) - cmd := sp.workload.sourceRunCmd(setup.src.tenant.secureURL(), workloadDuration) - c.Run(ctx, c.Node(setup.src.sqlNode), cmd) + err := c.RunE(workloadCtx, c.Node(setup.src.sqlNode), + sp.workload.sourceRunCmd(setup.src.tenant.secureURL())) + // The workload should only stop if the workloadCtx is cancelled once + // sp.additionalDuration has elapsed after the initial scan completes. + if workloadCtx.Err() == nil { + // Implies the workload context was not cancelled and the workload cmd returned on + // its own. + return errors.Wrapf(err, `Workload context was not cancelled. Error returned by workload cmd`) + } + workloadDoneCh <- struct{}{} return nil }) - cutoverTime := chooseCutover(t, setup.dst.sql, workloadDuration, sp.cutover) - t.Status("cutover time chosen: %s", cutoverTime.String()) + t.Status("starting replication stream") + streamReplStmt := fmt.Sprintf("CREATE TENANT %q FROM REPLICATION OF %q ON '%s'", + setup.dst.name, setup.src.name, setup.src.pgURL) + setup.dst.sql.Exec(t, streamReplStmt) + ingestionJobID := getIngestionJobID(t, setup.dst.sql, setup.dst.name) // TODO(ssd): The job doesn't record the initial // statement time, so we can't correctly measure the @@ -439,17 +459,45 @@ func registerClusterToCluster(r registry.Registry) { return lv.pollLatency(ctx, setup.dst.db, ingestionJobID, time.Second, workloadDoneCh) }) - t.Status("waiting for replication stream to finish ingesting initial scan") + t.L().Printf("waiting for replication stream to finish ingesting initial scan") waitForHighWatermark(t, setup.dst.db, ingestionJobID, sp.timeout/2) - t.Status("waiting for src cluster workload to complete") - m.Wait() - - t.Status("waiting for replication stream to cutover") + t.Status(fmt.Sprintf(`initial scan complete. run workload and repl. stream for another %s minutes`, + sp.additionalDuration)) + + var currentTime time.Time + setup.dst.sql.QueryRow(t, "SELECT clock_timestamp()").Scan(¤tTime) + cutoverTime := currentTime.Add(sp.additionalDuration - sp.cutover) + t.Status("cutover time chosen: ", cutoverTime.String()) + + select { + case <-time.After(sp.additionalDuration): + workloadCancel() + t.L().Printf("workload has finished after %s", sp.additionalDuration) + case <-ctx.Done(): + t.L().Printf(`roachtest context cancelled while waiting for workload duration to complete`) + return + } + t.Status(fmt.Sprintf("waiting for replication stream to cutover to %s", cutoverTime.String())) setup.metrics.cutoverStart = newSizeTime(ctx, du, setup.dst.kvNodes) stopReplicationStream(t, setup.dst.sql, ingestionJobID, cutoverTime) setup.metrics.cutoverEnd = newSizeTime(ctx, du, setup.dst.kvNodes) + // TODO(msbutler): export metrics to roachperf or prom/grafana + exportedMetrics := setup.metrics.export() + t.L().Printf(`Initial Scan: Duration, Size, Throughput; Cutover: Duration, Size, Throughput`) + t.L().Printf(`%s %s %s %s %s %s`, + exportedMetrics["InitialScanDuration"].String(), + exportedMetrics["InitialScanSize"].String(), + exportedMetrics["InitialScanThroughput"].String(), + exportedMetrics["CutoverDuration"].String(), + exportedMetrics["CutoverSize"].String(), + exportedMetrics["CutoverThroughput"].String(), + ) + for key, metric := range exportedMetrics { + t.L().Printf("%s: %s", key, metric.StringWithUnits()) + } + t.Status("comparing fingerprints") // Currently, it takes about 15 minutes to generate a fingerprint for // about 30 GB of data. Once the fingerprinting job is used instead, @@ -464,12 +512,6 @@ func registerClusterToCluster(r registry.Registry) { setup, hlc.Timestamp{WallTime: cutoverTime.UnixNano()}) lv.assertValid(t) - - // TODO(msbutler): export metrics to roachperf or prom/grafana - exportedMetrics := setup.metrics.export() - for key, metric := range exportedMetrics { - t.L().Printf("%s: %s", key, metric.String()) - } }, }) } @@ -483,14 +525,6 @@ func getIngestionJobID(t test.Test, dstSQL *sqlutils.SQLRunner, dstTenantName st return int(tenantInfo.TenantReplicationJobID) } -func chooseCutover( - t test.Test, dstSQL *sqlutils.SQLRunner, workloadDuration time.Duration, cutover time.Duration, -) time.Time { - var currentTime time.Time - dstSQL.QueryRow(t, "SELECT clock_timestamp()").Scan(¤tTime) - return currentTime.Add(workloadDuration - cutover) -} - func compareTenantFingerprintsAtTimestamp( t test.Test, m cluster.Monitor, setup *c2cSetup, ts hlc.Timestamp, ) { diff --git a/pkg/cmd/roachtest/tests/multitenant_utils.go b/pkg/cmd/roachtest/tests/multitenant_utils.go index dbb39f4fe0b3..0f2774081abf 100644 --- a/pkg/cmd/roachtest/tests/multitenant_utils.go +++ b/pkg/cmd/roachtest/tests/multitenant_utils.go @@ -251,6 +251,11 @@ func (tn *tenantNode) start(ctx context.Context, t test.Test, c cluster.Cluster, t.L().Printf("sql server for tenant %d (instance %d) now running", tn.tenantID, tn.instanceID) } +// conn returns a sql connection to the tenant +func (tn *tenantNode) conn() (*gosql.DB, error) { + return gosql.Open("postgres", tn.pgURL) +} + func startTenantServer( tenantCtx context.Context, c cluster.Cluster,