From a39a66519eb81236c1e8ce5b75563c454854ccdd Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Mon, 9 Jan 2023 09:01:50 -0500 Subject: [PATCH] c2c: increase c2c roachtest workload flexibility Previously in c2c roachtests, the foreground workload on the src cluster would run for a predefined amount of time, based on the expected initial scan time. But, if this estimated initial scan time wasn't accurate, the roachtest would not properly simulate c2c customer workload. E.g. if the initial scan actually took much longer than expected, the workload would finish before the initial scan! This patch removes the need to specify a duration for the src cluster workload. Instead, the goroutine running the workload will get cancelled at cutover time, determined by the `replicationTestSpec.additionalDuration` field, which specifies how long the workload should after the initial scan completes. This patch also adds additional logging which provides instructions for opening a sql session to the tenant and opening a tenant's dbconsole. Informs #89176 Release note: None --- pkg/cmd/roachtest/tests/cluster_to_cluster.go | 144 +++++++++++------- pkg/cmd/roachtest/tests/multitenant_utils.go | 5 + 2 files changed, 94 insertions(+), 55 deletions(-) diff --git a/pkg/cmd/roachtest/tests/cluster_to_cluster.go b/pkg/cmd/roachtest/tests/cluster_to_cluster.go index 3d1a022213f1..537056300dcb 100644 --- a/pkg/cmd/roachtest/tests/cluster_to_cluster.go +++ b/pkg/cmd/roachtest/tests/cluster_to_cluster.go @@ -122,10 +122,14 @@ func newMetric(metric float64, unit string) exportedMetric { return exportedMetric{metric, unit} } -func (em exportedMetric) String() string { +func (em exportedMetric) StringWithUnits() string { return fmt.Sprintf("%.2f %s", em.metric, em.unit) } +func (em exportedMetric) String() string { + return fmt.Sprintf("%.2f", em.metric) +} + // sizeTime captures the disk size of the nodes at some moment in time type sizeTime struct { // size is the megabytes of the objects @@ -236,6 +240,7 @@ func setupC2C( srcTenant.stop(ctx, t, c) pgCleanup() } + srcTenantInfo := clusterInfo{ name: srcTenantName, ID: srcTenantID, @@ -258,8 +263,19 @@ func setupC2C( srcTenantInfo.sql.Exec(t, `SELECT crdb_internal.update_tenant_resource_limits($1, 10000000000, 0, 10000000000, now(), 0);`, srcTenantInfo.ID) - createSystemRole(t, srcTenantInfo.name, srcTenantInfo.sql) - createSystemRole(t, destTenantInfo.name, destTenantInfo.sql) + createSystemRole(t, srcTenantInfo.name+" system tenant", srcTenantInfo.sql) + createSystemRole(t, destTenantInfo.name+" system tenant", destTenantInfo.sql) + + tenantConn, err := srcTenant.conn() + require.NoError(t, err) + createSystemRole(t, destTenantInfo.name+" app tenant", sqlutils.MakeSQLRunner(tenantConn)) + t.L().Printf(`To open a sql session on the app tenant, ssh to the tenant node and run: + ./cockroach sql url="%s"`, srcTenant.secureURL()) + t.L().Printf(`To open the app tenant's db console, run: + 1. roachprod adminui $CLUSTER:%d + 2. change the port in the url to %d +`, srcTenantNode, tenantHTTPPort) + return &c2cSetup{ src: srcTenantInfo, dst: destTenantInfo, @@ -272,7 +288,7 @@ func createSystemRole(t test.Test, name string, sql *sqlutils.SQLRunner) { password := "roach" sql.Exec(t, fmt.Sprintf(`CREATE ROLE %s WITH LOGIN PASSWORD '%s'`, username, password)) sql.Exec(t, fmt.Sprintf(`GRANT ADMIN TO %s`, username)) - t.L().Printf(`Log into the %s system tenant db console with username "%s" and password "%s"`, + t.L().Printf(`Log into the %s db console with username "%s" and password "%s"`, name, username, password) } @@ -281,9 +297,8 @@ type streamingWorkload interface { // replication stream begins sourceInitCmd(pgURL string) string - // sourceRunCmd returns a command that will run a workload for the given duration on the src - // cluster during the replication stream. - sourceRunCmd(pgURL string, duration time.Duration) string + // sourceRunCmd returns a command that will run a workload + sourceRunCmd(pgURL string) string } type replicateTPCC struct { @@ -295,10 +310,10 @@ func (tpcc replicateTPCC) sourceInitCmd(pgURL string) string { tpcc.warehouses, pgURL) } -func (tpcc replicateTPCC) sourceRunCmd(pgURL string, duration time.Duration) string { +func (tpcc replicateTPCC) sourceRunCmd(pgURL string) string { // added --tolerate-errors flags to prevent test from flaking due to a transaction retry error - return fmt.Sprintf(`./workload run tpcc --warehouses %d --duration %dm --tolerate-errors '%s'`, - tpcc.warehouses, int(duration.Minutes()), pgURL) + return fmt.Sprintf(`./workload run tpcc --warehouses %d --tolerate-errors '%s'`, + tpcc.warehouses, pgURL) } type replicateKV struct { @@ -309,10 +324,9 @@ func (kv replicateKV) sourceInitCmd(pgURL string) string { return "" } -func (kv replicateKV) sourceRunCmd(pgURL string, duration time.Duration) string { +func (kv replicateKV) sourceRunCmd(pgURL string) string { // added --tolerate-errors flags to prevent test from flaking due to a transaction retry error - return fmt.Sprintf(`./workload run kv --tolerate-errors --init --duration %dm --read-percent %d '%s'`, - int(duration.Minutes()), + return fmt.Sprintf(`./workload run kv --tolerate-errors --init --read-percent %d '%s'`, kv.readPercent, pgURL) } @@ -349,7 +363,7 @@ type replicationTestSpec struct { func registerClusterToCluster(r registry.Registry) { for _, sp := range []replicationTestSpec{ { - name: "c2c/tpcc", + name: "c2c/tpcc/warehouses=500/duration=10/cutover=5", srcKVNodes: 4, dstKVNodes: 4, cpus: 8, @@ -362,12 +376,13 @@ func registerClusterToCluster(r registry.Registry) { timeout: 1 * time.Hour, additionalDuration: 10 * time.Minute, cutover: 5 * time.Minute, - }, { + }, + { name: "c2c/kv0", srcKVNodes: 3, dstKVNodes: 3, cpus: 8, - pdSize: 1000, + pdSize: 100, workload: replicateKV{readPercent: 0}, timeout: 1 * time.Hour, additionalDuration: 10 * time.Minute, @@ -385,11 +400,16 @@ func registerClusterToCluster(r registry.Registry) { Name: sp.name, Owner: registry.OwnerDisasterRecovery, Cluster: r.MakeClusterSpec(sp.dstKVNodes+sp.srcKVNodes+1, clusterOps...), + Timeout: sp.timeout, RequiresLicense: true, Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { setup, cleanup := setupC2C(ctx, t, c, sp.srcKVNodes, sp.dstKVNodes) defer cleanup() + + t.L().Printf("tenant secureURL: %s; http port: %s; sql port: %s", + setup.src.tenant.secureURL(), setup.src.tenant.httpPort, setup.src.tenant.sqlPort) + m := c.NewMonitor(ctx, setup.src.kvNodes.Merge(setup.dst.kvNodes)) du, err := NewDiskUsageTracker(c, t.L()) require.NoError(t, err) @@ -401,34 +421,34 @@ func registerClusterToCluster(r registry.Registry) { setup.metrics.initialScanEnd = newSizeTime(ctx, du, setup.src.kvNodes) initDuration = setup.metrics.initialScanEnd.time.Sub(setup.metrics.start.time) - t.L().Printf("src cluster workload initialization took %d minutes", int(initDuration.Minutes())) + t.L().Printf("src cluster workload initialization took %s minutes", initDuration) } + t.L().Printf("begin workload on src cluster") + workloadCtx, workloadCancel := context.WithCancel(ctx) + defer workloadCancel() + + workloadDoneCh := make(chan struct{}) + go func() { + err := c.RunE(workloadCtx, c.Node(setup.src.sqlNode), + sp.workload.sourceRunCmd(setup.src.tenant. + secureURL())) + // The workload should only stop if the workloadCtx is cancelled once + // sp.additionalDuration has elapsed after the initial scan completes. + if workloadCtx.Err() == nil { + // Implies the workload context was not cancelled and the workload cmd returned on + // its own. + t.Fatalf(`Workload context was not cancelled. Error returned by workload cmd: %s`, err) + } + workloadDoneCh <- struct{}{} + }() + t.Status("starting replication stream") streamReplStmt := fmt.Sprintf("CREATE TENANT %q FROM REPLICATION OF %q ON '%s'", setup.dst.name, setup.src.name, setup.src.pgURL) setup.dst.sql.Exec(t, streamReplStmt) ingestionJobID := getIngestionJobID(t, setup.dst.sql, setup.dst.name) - // The replication stream is expected to spend some time conducting an - // initial scan, ideally on the same order as the `initDuration`, the - // time taken to initially populate the source cluster. To ensure the - // latency verifier stabilizes, ensure the workload and the replication - // stream run for a significant amount of time after the initial scan - // ends. Explicitly, set the workload to run for the estimated initial scan - // runtime + the user specified workload duration. - workloadDuration := initDuration + sp.additionalDuration - workloadDoneCh := make(chan struct{}) - m.Go(func(ctx context.Context) error { - defer close(workloadDoneCh) - cmd := sp.workload.sourceRunCmd(setup.src.tenant.secureURL(), workloadDuration) - c.Run(ctx, c.Node(setup.src.sqlNode), cmd) - return nil - }) - - cutoverTime := chooseCutover(t, setup.dst.sql, workloadDuration, sp.cutover) - t.Status("cutover time chosen: %s", cutoverTime.String()) - // TODO(ssd): The job doesn't record the initial // statement time, so we can't correctly measure the // initial scan time here. @@ -439,17 +459,45 @@ func registerClusterToCluster(r registry.Registry) { return lv.pollLatency(ctx, setup.dst.db, ingestionJobID, time.Second, workloadDoneCh) }) - t.Status("waiting for replication stream to finish ingesting initial scan") + t.L().Printf("waiting for replication stream to finish ingesting initial scan") waitForHighWatermark(t, setup.dst.db, ingestionJobID, sp.timeout/2) - t.Status("waiting for src cluster workload to complete") - m.Wait() - - t.Status("waiting for replication stream to cutover") + t.Status(fmt.Sprintf(`initial scan complete. run workload and repl. stream for another %s minutes`, + sp.additionalDuration)) + + var currentTime time.Time + setup.dst.sql.QueryRow(t, "SELECT clock_timestamp()").Scan(¤tTime) + cutoverTime := currentTime.Add(sp.additionalDuration - sp.cutover) + t.Status("cutover time chosen: ", cutoverTime.String()) + + select { + case <-time.After(sp.additionalDuration): + workloadCancel() + t.L().Printf("workload has finished after %s", sp.additionalDuration) + case <-ctx.Done(): + t.L().Printf(`roachtest context cancelled while waiting for workload duration to complete`) + return + } + t.Status(fmt.Sprintf("waiting for replication stream to cutover to %s", cutoverTime.String())) setup.metrics.cutoverStart = newSizeTime(ctx, du, setup.dst.kvNodes) stopReplicationStream(t, setup.dst.sql, ingestionJobID, cutoverTime) setup.metrics.cutoverEnd = newSizeTime(ctx, du, setup.dst.kvNodes) + // TODO(msbutler): export metrics to roachperf or prom/grafana + exportedMetrics := setup.metrics.export() + t.L().Printf(`Initial Scan: Duration, Size, Throughput; Cutover: Duration, Size, Throughput`) + t.L().Printf(`%s %s %s %s %s %s`, + exportedMetrics["InitialScanDuration"].String(), + exportedMetrics["InitialScanSize"].String(), + exportedMetrics["InitialScanThroughput"].String(), + exportedMetrics["CutoverDuration"].String(), + exportedMetrics["CutoverSize"].String(), + exportedMetrics["CutoverThroughput"].String(), + ) + for key, metric := range exportedMetrics { + t.L().Printf("%s: %s", key, metric.StringWithUnits()) + } + t.Status("comparing fingerprints") // Currently, it takes about 15 minutes to generate a fingerprint for // about 30 GB of data. Once the fingerprinting job is used instead, @@ -464,12 +512,6 @@ func registerClusterToCluster(r registry.Registry) { setup, hlc.Timestamp{WallTime: cutoverTime.UnixNano()}) lv.assertValid(t) - - // TODO(msbutler): export metrics to roachperf or prom/grafana - exportedMetrics := setup.metrics.export() - for key, metric := range exportedMetrics { - t.L().Printf("%s: %s", key, metric.String()) - } }, }) } @@ -483,14 +525,6 @@ func getIngestionJobID(t test.Test, dstSQL *sqlutils.SQLRunner, dstTenantName st return int(tenantInfo.TenantReplicationJobID) } -func chooseCutover( - t test.Test, dstSQL *sqlutils.SQLRunner, workloadDuration time.Duration, cutover time.Duration, -) time.Time { - var currentTime time.Time - dstSQL.QueryRow(t, "SELECT clock_timestamp()").Scan(¤tTime) - return currentTime.Add(workloadDuration - cutover) -} - func compareTenantFingerprintsAtTimestamp( t test.Test, m cluster.Monitor, setup *c2cSetup, ts hlc.Timestamp, ) { diff --git a/pkg/cmd/roachtest/tests/multitenant_utils.go b/pkg/cmd/roachtest/tests/multitenant_utils.go index dbb39f4fe0b3..0f2774081abf 100644 --- a/pkg/cmd/roachtest/tests/multitenant_utils.go +++ b/pkg/cmd/roachtest/tests/multitenant_utils.go @@ -251,6 +251,11 @@ func (tn *tenantNode) start(ctx context.Context, t test.Test, c cluster.Cluster, t.L().Printf("sql server for tenant %d (instance %d) now running", tn.tenantID, tn.instanceID) } +// conn returns a sql connection to the tenant +func (tn *tenantNode) conn() (*gosql.DB, error) { + return gosql.Open("postgres", tn.pgURL) +} + func startTenantServer( tenantCtx context.Context, c cluster.Cluster,