diff --git a/pkg/cmd/roachtest/tests/backup.go b/pkg/cmd/roachtest/tests/backup.go index aa8766ab6255..8e4039311e81 100644 --- a/pkg/cmd/roachtest/tests/backup.go +++ b/pkg/cmd/roachtest/tests/backup.go @@ -1113,11 +1113,19 @@ func registerBackup(r registry.Registry) { Cluster: r.MakeClusterSpec(3, spec.CPU(8)), EncryptionSupport: registry.EncryptionMetamorphic, Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { - runBackupMVCCRangeTombstones(ctx, t, c) + runBackupMVCCRangeTombstones(ctx, t, c, mvccRangeTombstoneConfig{}) }, }) } +type mvccRangeTombstoneConfig struct { + tenantName string + skipClusterSetup bool + + // TODO(msbutler): delete once tenants can back up to nodelocal. + skipBackupRestore bool +} + // runBackupMVCCRangeTombstones tests that backup and restore works in the // presence of MVCC range tombstones. It uses data from TPCH's order table, 16 // GB across 8 CSV files. @@ -1136,21 +1144,27 @@ func registerBackup(r registry.Registry) { // We then do point-in-time restores of the database at times 'initial', // 'canceled', 'completed', and the latest time, and compare the fingerprints to // the original data. -func runBackupMVCCRangeTombstones(ctx context.Context, t test.Test, c cluster.Cluster) { - c.Put(ctx, t.Cockroach(), "./cockroach") - c.Put(ctx, t.DeprecatedWorkload(), "./workload") // required for tpch - c.Start(ctx, t.L(), option.DefaultStartOptsNoBackups(), install.MakeClusterSettings()) +func runBackupMVCCRangeTombstones( + ctx context.Context, t test.Test, c cluster.Cluster, config mvccRangeTombstoneConfig, +) { + if !config.skipClusterSetup { + c.Put(ctx, t.Cockroach(), "./cockroach") + c.Put(ctx, t.DeprecatedWorkload(), "./workload") // required for tpch + c.Start(ctx, t.L(), option.DefaultStartOptsNoBackups(), install.MakeClusterSettings()) + } t.Status("starting csv servers") c.Run(ctx, c.All(), `./cockroach workload csv-server --port=8081 &> logs/workload-csv-server.log < /dev/null &`) - conn := c.Conn(ctx, t.L(), 1) + conn := c.Conn(ctx, t.L(), 1, option.TenantName(config.tenantName)) // Configure cluster. t.Status("configuring cluster") _, err := conn.Exec(`SET CLUSTER SETTING kv.bulk_ingest.max_index_buffer_size = '2gb'`) require.NoError(t, err) - _, err = conn.Exec(`SET CLUSTER SETTING storage.mvcc.range_tombstones.enabled = 't'`) - require.NoError(t, err) + if config.tenantName == "" { + _, err = conn.Exec(`SET CLUSTER SETTING storage.mvcc.range_tombstones.enabled = 't'`) + require.NoError(t, err) + } _, err = conn.Exec(`SET CLUSTER SETTING server.debug.default_vmodule = 'txn=2,sst_batcher=4, revert=2'`) require.NoError(t, err) @@ -1205,6 +1219,9 @@ revert=2'`) } fingerprint := func(name, database, table string) (string, string, string) { + if config.skipBackupRestore { + return "", "", "" + } var ts string require.NoError(t, conn.QueryRowContext(ctx, `SELECT now()`).Scan(&ts)) @@ -1247,10 +1264,12 @@ revert=2'`) // Take a full backup, using a database backup in order to perform a final // incremental backup after the table has been dropped. - t.Status("taking full backup") dest := "nodelocal://1/" + destinationName(c) - _, err = conn.ExecContext(ctx, `BACKUP DATABASE tpch INTO $1 WITH revision_history`, dest) - require.NoError(t, err) + if !config.skipBackupRestore { + t.Status("taking full backup") + _, err = conn.ExecContext(ctx, `BACKUP DATABASE tpch INTO $1 WITH revision_history`, dest) + require.NoError(t, err) + } // Import and cancel even-numbered files twice. files = []string{ @@ -1341,42 +1360,44 @@ revert=2'`) require.ErrorIs(t, err, gosql.ErrNoRows) // Take a final incremental backup. - t.Status("taking incremental backup") - _, err = conn.ExecContext(ctx, `BACKUP DATABASE tpch INTO LATEST IN $1 WITH revision_history`, - dest) - require.NoError(t, err) + if !config.skipBackupRestore { + t.Status("taking incremental backup") + _, err = conn.ExecContext(ctx, `BACKUP DATABASE tpch INTO LATEST IN $1 WITH revision_history`, + dest) + require.NoError(t, err) - // Schedule a final restore of the latest backup (above). - restores = append(restores, restore{ - name: "dropped", - expectNoTables: true, - }) + // Schedule a final restore of the latest backup (above). + restores = append(restores, restore{ + name: "dropped", + expectNoTables: true, + }) - // Restore backups at specific times and verify them. - for _, r := range restores { - t.Status(fmt.Sprintf("restoring backup at time '%s'", r.name)) - db := "restore_" + r.name - if r.time != "" { - _, err = conn.ExecContext(ctx, fmt.Sprintf( - `RESTORE DATABASE tpch FROM LATEST IN '%s' AS OF SYSTEM TIME '%s' WITH new_db_name = '%s'`, - dest, r.time, db)) - require.NoError(t, err) - } else { - _, err = conn.ExecContext(ctx, fmt.Sprintf( - `RESTORE DATABASE tpch FROM LATEST IN '%s' WITH new_db_name = '%s'`, dest, db)) - require.NoError(t, err) - } + // Restore backups at specific times and verify them. + for _, r := range restores { + t.Status(fmt.Sprintf("restoring backup at time '%s'", r.name)) + db := "restore_" + r.name + if r.time != "" { + _, err = conn.ExecContext(ctx, fmt.Sprintf( + `RESTORE DATABASE tpch FROM LATEST IN '%s' AS OF SYSTEM TIME '%s' WITH new_db_name = '%s'`, + dest, r.time, db)) + require.NoError(t, err) + } else { + _, err = conn.ExecContext(ctx, fmt.Sprintf( + `RESTORE DATABASE tpch FROM LATEST IN '%s' WITH new_db_name = '%s'`, dest, db)) + require.NoError(t, err) + } - if expect := r.expectFingerprint; expect != "" { - _, _, fp = fingerprint(r.name, db, "orders") - require.Equal(t, expect, fp, "fingerprint mismatch for restore at time '%s'", r.name) - } - if r.expectNoTables { - var tableCount int - require.NoError(t, conn.QueryRowContext(ctx, fmt.Sprintf( - `SELECT count(*) FROM [SHOW TABLES FROM %s]`, db)).Scan(&tableCount)) - require.Zero(t, tableCount, "found tables in restore at time '%s'", r.name) - t.Status("confirmed no tables in database " + db) + if expect := r.expectFingerprint; expect != "" { + _, _, fp = fingerprint(r.name, db, "orders") + require.Equal(t, expect, fp, "fingerprint mismatch for restore at time '%s'", r.name) + } + if r.expectNoTables { + var tableCount int + require.NoError(t, conn.QueryRowContext(ctx, fmt.Sprintf( + `SELECT count(*) FROM [SHOW TABLES FROM %s]`, db)).Scan(&tableCount)) + require.Zero(t, tableCount, "found tables in restore at time '%s'", r.name) + t.Status("confirmed no tables in database " + db) + } } } } diff --git a/pkg/cmd/roachtest/tests/cluster_to_cluster.go b/pkg/cmd/roachtest/tests/cluster_to_cluster.go index 5f20b5ad262e..c6c01f55e9a8 100644 --- a/pkg/cmd/roachtest/tests/cluster_to_cluster.go +++ b/pkg/cmd/roachtest/tests/cluster_to_cluster.go @@ -67,7 +67,6 @@ type c2cSetup struct { src clusterInfo dst clusterInfo workloadNode option.NodeListOption - metrics c2cMetrics } var c2cPromMetrics = map[string]clusterstats.ClusterStat{ @@ -90,46 +89,6 @@ func sumOverLabel(stats map[string]map[string]clusterstats.StatPoint, label stri return mean } -func (cc *c2cSetup) startStatsCollection( - ctx context.Context, t test.Test, c cluster.Cluster, -) func(time.Time) map[string]float64 { - - if c.IsLocal() { - // Grafana does not run locally. - return func(snapTime time.Time) map[string]float64 { - return map[string]float64{} - } - } - // TODO(msbutler): pass a proper cluster replication dashboard and figure out why we need to - // pass a grafana dashboard for this to work - cfg := (&prometheus.Config{}). - WithPrometheusNode(cc.workloadNode.InstallNodes()[0]). - WithCluster(cc.dst.nodes.InstallNodes()). - WithNodeExporter(cc.dst.nodes.InstallNodes()). - WithGrafanaDashboard("https://go.crdb.dev/p/changefeed-roachtest-grafana-dashboard") - - require.NoError(t, c.StartGrafana(ctx, t.L(), cfg)) - t.L().Printf("Prom has started") - - client, err := clusterstats.SetupCollectorPromClient(ctx, c, t.L(), cfg) - require.NoError(t, err, "error creating prometheus client for stats collector") - collector := clusterstats.NewStatsCollector(ctx, client) - - return func(snapTime time.Time) map[string]float64 { - metricSnap := make(map[string]float64) - for name, stat := range c2cPromMetrics { - point, err := collector.CollectPoint(ctx, t.L(), snapTime, stat.Query) - if err != nil { - t.L().Errorf("Could not query prom %s", err.Error()) - } - // TODO(msbutler): update the CollectPoint api to conduct the sum in Prom instead. - metricSnap[name] = sumOverLabel(point, stat.LabelName) - t.L().Printf("%s: %.2f", name, metricSnap[name]) - } - return metricSnap - } -} - // DiskUsageTracker can grab the disk usage of the provided cluster. // // TODO(msbutler): deprecate this, once restore roachtests also use prom setup. @@ -170,11 +129,10 @@ type metricSnapshot struct { func newMetricSnapshot( metricSnapper func(time.Time) map[string]float64, ts time.Time, ) metricSnapshot { - snap := metricSnapshot{ + return metricSnapshot{ time: ts, metrics: metricSnapper(ts), } - return snap } // calcThroughput estimates throughput between two time intervals as metric_unit/s/node @@ -252,14 +210,139 @@ func (m c2cMetrics) export(t test.Test, nodeCount int) { aggregate(m.cutoverTo, m.cutoverStart, "Cutover", m.cutoverEnd.time.Sub(m.cutoverStart.time)) } -func setupC2C( - ctx context.Context, t test.Test, c cluster.Cluster, srcKVNodes, - dstKVNodes int, -) *c2cSetup { +type streamingWorkload interface { + // sourceInitCmd returns a command that will populate the src cluster with data before the + // replication stream begins + sourceInitCmd(tenantName string, nodes option.NodeListOption) string + + // sourceRunCmd returns a command that will run a workload + sourceRunCmd(tenantName string, nodes option.NodeListOption) string + + runDriver(workloadCtx context.Context, c cluster.Cluster, t test.Test, setup *c2cSetup) error +} + +func defaultWorkloadDriver( + workloadCtx context.Context, setup *c2cSetup, c cluster.Cluster, workload streamingWorkload, +) error { + return c.RunE(workloadCtx, setup.workloadNode, workload.sourceRunCmd(setup.src.name, setup.src.nodes)) +} + +type replicateTPCC struct { + warehouses int +} + +func (tpcc replicateTPCC) sourceInitCmd(tenantName string, nodes option.NodeListOption) string { + return fmt.Sprintf(`./workload init tpcc --data-loader import --warehouses %d {pgurl%s:%s}`, + tpcc.warehouses, nodes, tenantName) +} + +func (tpcc replicateTPCC) sourceRunCmd(tenantName string, nodes option.NodeListOption) string { + // added --tolerate-errors flags to prevent test from flaking due to a transaction retry error + return fmt.Sprintf(`./workload run tpcc --warehouses %d --tolerate-errors {pgurl%s:%s}`, + tpcc.warehouses, nodes, tenantName) +} + +func (tpcc replicateTPCC) runDriver( + workloadCtx context.Context, c cluster.Cluster, t test.Test, setup *c2cSetup, +) error { + return defaultWorkloadDriver(workloadCtx, setup, c, tpcc) +} + +type replicateKV struct { + readPercent int + + // This field is merely used to debug the c2c framework for finite workloads. + debugRunDurationMinutes int +} + +func (kv replicateKV) sourceInitCmd(tenantName string, nodes option.NodeListOption) string { + return "" +} + +func (kv replicateKV) sourceRunCmd(tenantName string, nodes option.NodeListOption) string { + debugDuration := "" + if kv.debugRunDurationMinutes != 0 { + debugDuration = fmt.Sprintf("--duration %dm", kv.debugRunDurationMinutes) + } + // added --tolerate-errors flags to prevent test from flaking due to a transaction retry error + return fmt.Sprintf(`./workload run kv --tolerate-errors --init %s --read-percent %d {pgurl%s:%s}`, + debugDuration, + kv.readPercent, + nodes, + tenantName) +} + +func (kv replicateKV) runDriver( + workloadCtx context.Context, c cluster.Cluster, t test.Test, setup *c2cSetup, +) error { + return defaultWorkloadDriver(workloadCtx, setup, c, kv) +} + +type replicateBulkOps struct { +} + +func (bo replicateBulkOps) sourceInitCmd(tenantName string, nodes option.NodeListOption) string { + return "" +} + +func (bo replicateBulkOps) sourceRunCmd(tenantName string, nodes option.NodeListOption) string { + return "" +} + +func (bo replicateBulkOps) runDriver( + workloadCtx context.Context, c cluster.Cluster, t test.Test, setup *c2cSetup, +) error { + runBackupMVCCRangeTombstones(workloadCtx, t, c, mvccRangeTombstoneConfig{ + skipBackupRestore: true, + skipClusterSetup: true, + tenantName: setup.src.name}) + return nil +} + +type replicationTestSpec struct { + // name specifies the name of the roachtest + name string + + // srcodes is the number of nodes on the source cluster. + srcNodes int + + // dstNodes is the number of nodes on the destination cluster. + dstNodes int + + // cpus is the per node cpu count. + cpus int + + // pdSize specifies the pd-ssd volume (in GB). If set to 0, local ssds are used. + pdSize int + + // workload specifies the streaming workload. + workload streamingWorkload + + // additionalDuration specifies how long the workload will run after the initial scan + //completes. If the time out is set to 0, it will run until completion. + additionalDuration time.Duration + + // cutover specifies how soon before the workload ends to choose a cutover timestamp. + cutover time.Duration + + // timeout specifies when the roachtest should fail due to timeout. + timeout time.Duration + + // If non-empty, the test will be skipped with the supplied reason. + skip string + + // fields below are instantiated at runtime + setup *c2cSetup + t test.Test + c cluster.Cluster + metrics *c2cMetrics +} + +func (sp *replicationTestSpec) setupC2C(ctx context.Context, t test.Test, c cluster.Cluster) { c.Put(ctx, t.Cockroach(), "./cockroach") - srcCluster := c.Range(1, srcKVNodes) - dstCluster := c.Range(srcKVNodes+1, srcKVNodes+dstKVNodes) - workloadNode := c.Node(srcKVNodes + dstKVNodes + 1) + srcCluster := c.Range(1, sp.srcNodes) + dstCluster := c.Range(sp.srcNodes+1, sp.srcNodes+sp.dstNodes) + workloadNode := c.Node(sp.srcNodes + sp.dstNodes + 1) c.Put(ctx, t.DeprecatedWorkload(), "./workload", workloadNode) // TODO(msbutler): allow for backups once this test stabilizes a bit more. @@ -270,7 +353,7 @@ func setupC2C( // TODO(msbutler): allow for backups once this test stabilizes a bit more. dstStartOps := option.DefaultStartOptsNoBackups() - dstStartOps.RoachprodOpts.InitTarget = srcKVNodes + 1 + dstStartOps.RoachprodOpts.InitTarget = sp.srcNodes + 1 dstClusterSetting := install.MakeClusterSettings(install.SecureOption(true)) c.Start(ctx, t.L(), dstStartOps, dstClusterSetting, dstCluster) @@ -314,82 +397,167 @@ func setupC2C( db: destDB, nodes: dstCluster} - setup := &c2cSetup{ + sp.setup = &c2cSetup{ src: srcTenantInfo, dst: destTenantInfo, - workloadNode: workloadNode, - metrics: c2cMetrics{}} - - return setup + workloadNode: workloadNode} + sp.t = t + sp.c = c + sp.metrics = &c2cMetrics{} } -type streamingWorkload interface { - // sourceInitCmd returns a command that will populate the src cluster with data before the - // replication stream begins - sourceInitCmd(tenantName string, nodes option.NodeListOption) string +func (sp *replicationTestSpec) startStatsCollection( + ctx context.Context, +) func(time.Time) map[string]float64 { - // sourceRunCmd returns a command that will run a workload - sourceRunCmd(tenantName string, nodes option.NodeListOption) string -} + if sp.c.IsLocal() { + sp.t.L().Printf("Local test. Don't setup grafana") + // Grafana does not run locally. + return func(snapTime time.Time) map[string]float64 { + return map[string]float64{} + } + } + // TODO(msbutler): pass a proper cluster replication dashboard and figure out why we need to + // pass a grafana dashboard for this to work + cfg := (&prometheus.Config{}). + WithPrometheusNode(sp.setup.workloadNode.InstallNodes()[0]). + WithCluster(sp.setup.dst.nodes.InstallNodes()). + WithNodeExporter(sp.setup.dst.nodes.InstallNodes()). + WithGrafanaDashboard("https://go.crdb.dev/p/changefeed-roachtest-grafana-dashboard") -type replicateTPCC struct { - warehouses int -} + require.NoError(sp.t, sp.c.StartGrafana(ctx, sp.t.L(), cfg)) + sp.t.L().Printf("Prom has started") -func (tpcc replicateTPCC) sourceInitCmd(tenantName string, nodes option.NodeListOption) string { - return fmt.Sprintf(`./workload init tpcc --data-loader import --warehouses %d {pgurl%s:%s}`, - tpcc.warehouses, nodes, tenantName) -} + client, err := clusterstats.SetupCollectorPromClient(ctx, sp.c, sp.t.L(), cfg) + require.NoError(sp.t, err, "error creating prometheus client for stats collector") + collector := clusterstats.NewStatsCollector(ctx, client) -func (tpcc replicateTPCC) sourceRunCmd(tenantName string, nodes option.NodeListOption) string { - // added --tolerate-errors flags to prevent test from flaking due to a transaction retry error - return fmt.Sprintf(`./workload run tpcc --warehouses %d --tolerate-errors {pgurl%s:%s}`, - tpcc.warehouses, nodes, tenantName) + return func(snapTime time.Time) map[string]float64 { + metricSnap := make(map[string]float64) + for name, stat := range c2cPromMetrics { + point, err := collector.CollectPoint(ctx, sp.t.L(), snapTime, stat.Query) + if err != nil { + sp.t.L().Errorf("Could not query prom %s", err.Error()) + } + // TODO(msbutler): update the CollectPoint api to conduct the sum in Prom instead. + metricSnap[name] = sumOverLabel(point, stat.LabelName) + sp.t.L().Printf("%s: %.2f", name, metricSnap[name]) + } + return metricSnap + } } -type replicateKV struct { - readPercent int +func (sp *replicationTestSpec) preStreamingWorkload(ctx context.Context) { + if initCmd := sp.workload.sourceInitCmd(sp.setup.src.name, sp.setup.src.nodes); initCmd != "" { + sp.t.Status("populating source cluster before replication") + initStart := timeutil.Now() + sp.c.Run(ctx, sp.setup.workloadNode, initCmd) + sp.t.L().Printf("src cluster workload initialization took %s minutes", + timeutil.Since(initStart).Minutes()) + } } -func (kv replicateKV) sourceInitCmd(tenantName string, nodes option.NodeListOption) string { - return "" +func (sp *replicationTestSpec) startReplicationStream() int { + streamReplStmt := fmt.Sprintf("CREATE TENANT %q FROM REPLICATION OF %q ON '%s'", + sp.setup.dst.name, sp.setup.src.name, sp.setup.src.pgURL) + sp.setup.dst.sysSQL.Exec(sp.t, streamReplStmt) + return getIngestionJobID(sp.t, sp.setup.dst.sysSQL, sp.setup.dst.name) } -func (kv replicateKV) sourceRunCmd(tenantName string, nodes option.NodeListOption) string { - // added --tolerate-errors flags to prevent test from flaking due to a transaction retry error - return fmt.Sprintf(`./workload run kv --tolerate-errors --init --read-percent %d {pgurl%s:%s}`, - kv.readPercent, - nodes, - tenantName) +func (sp *replicationTestSpec) runWorkload(ctx context.Context) error { + return sp.workload.runDriver(ctx, sp.c, sp.t, sp.setup) } -type replicationTestSpec struct { - // name specifies the name of the roachtest - name string - - // srcodes is the number of nodes on the source cluster. - srcNodes int +func (sp *replicationTestSpec) waitForHighWatermark(ingestionJobID int, wait time.Duration) { + testutils.SucceedsWithin(sp.t, func() error { + info, err := getStreamIngestionJobInfo(sp.setup.dst.db, ingestionJobID) + if err != nil { + return err + } + if info.GetHighWater().IsZero() { + return errors.New("no high watermark") + } + return nil + }, wait) +} - // dstNodes is the number of nodes on the destination cluster. - dstNodes int +func (sp *replicationTestSpec) getWorkloadTimeout() time.Duration { + if sp.additionalDuration != 0 { + return sp.additionalDuration + } + return sp.timeout +} - // cpus is the per node cpu count. - cpus int +// getReplicationRetainedTime returns the `retained_time` of the replication +// job. +func (sp *replicationTestSpec) getReplicationRetainedTime() time.Time { + var retainedTime time.Time + sp.setup.dst.sysSQL.QueryRow(sp.t, + `SELECT retained_time FROM [SHOW TENANT $1 WITH REPLICATION STATUS]`, + roachpb.TenantName(sp.setup.dst.name)).Scan(&retainedTime) + return retainedTime +} - // pdSize specifies the pd-ssd volume (in GB). If set to 0, local ssds are used. - pdSize int +func (sp *replicationTestSpec) stopReplicationStream(ingestionJob int, cutoverTime time.Time) { + sp.setup.dst.sysSQL.Exec(sp.t, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, sp.setup.dst.name, cutoverTime) + err := retry.ForDuration(time.Minute*5, func() error { + var status string + var payloadBytes []byte + sp.setup.dst.sysSQL.QueryRow(sp.t, `SELECT status, payload FROM system.jobs WHERE id = $1`, + ingestionJob).Scan(&status, &payloadBytes) + if jobs.Status(status) == jobs.StatusFailed { + payload := &jobspb.Payload{} + if err := protoutil.Unmarshal(payloadBytes, payload); err == nil { + sp.t.Fatalf("job failed: %s", payload.Error) + } + sp.t.Fatalf("job failed") + } + if e, a := jobs.StatusSucceeded, jobs.Status(status); e != a { + return errors.Errorf("expected job status %s, but got %s", e, a) + } + return nil + }) + require.NoError(sp.t, err) +} - // workload specifies the streaming workload. - workload streamingWorkload +func (sp *replicationTestSpec) compareTenantFingerprintsAtTimestamp( + ctx context.Context, startTime, endTime time.Time, +) { + sp.t.Status(fmt.Sprintf("comparing tenant fingerprints between start time %s and end time %s", + startTime.UTC(), endTime.UTC())) - // additionalDuration specifies how long the workload will run after the initial scan completes. - additionalDuration time.Duration + // TODO(adityamaru,lidorcarmel): Once we agree on the format and precision we + // display all user facing timestamps with, we should revisit how we format + // the start time to ensure we are fingerprinting from the most accurate lower + // bound. + microSecondRFC3339Format := "2006-01-02 15:04:05.999999" + startTimeStr := startTime.Format(microSecondRFC3339Format) + aost := hlc.Timestamp{WallTime: endTime.UnixNano()}.AsOfSystemTime() + fingerprintQuery := fmt.Sprintf(` +SELECT * +FROM crdb_internal.fingerprint(crdb_internal.tenant_span($1::INT), '%s'::TIMESTAMPTZ, true) +AS OF SYSTEM TIME '%s'`, startTimeStr, aost) - // cutover specifies how soon before the workload ends to choose a cutover timestamp. - cutover time.Duration + var srcFingerprint int64 + fingerPrintMonitor := sp.c.NewMonitor(ctx, sp.setup.src.nodes.Merge(sp.setup.dst.nodes)) + fingerPrintMonitor.Go(func(ctx context.Context) error { + sp.setup.src.sysSQL.QueryRow(sp.t, fingerprintQuery, sp.setup.src.ID).Scan(&srcFingerprint) + return nil + }) + var destFingerprint int64 + fingerPrintMonitor.Go(func(ctx context.Context) error { + // TODO(adityamaru): Measure and record fingerprinting throughput. + sp.metrics.fingerprintingStart = timeutil.Now() + sp.setup.dst.sysSQL.QueryRow(sp.t, fingerprintQuery, sp.setup.dst.ID).Scan(&destFingerprint) + sp.metrics.fingerprintingEnd = timeutil.Now() + fingerprintingDuration := sp.metrics.fingerprintingEnd.Sub(sp.metrics.fingerprintingStart).String() + sp.t.L().Printf("fingerprinting the destination tenant took %s", fingerprintingDuration) + return nil + }) - // timeout specifies when the roachtest should fail due to timeout. - timeout time.Duration + // If the goroutine gets cancelled or fataled, return before comparing fingerprints. + require.NoError(sp.t, fingerPrintMonitor.WaitE()) + require.Equal(sp.t, srcFingerprint, destFingerprint) } func registerClusterToCluster(r registry.Registry) { @@ -435,6 +603,29 @@ func registerClusterToCluster(r registry.Registry) { additionalDuration: 10 * time.Minute, cutover: 5 * time.Minute, }, + { + name: "c2c/UnitTest", + srcNodes: 1, + dstNodes: 1, + cpus: 4, + pdSize: 10, + workload: replicateKV{readPercent: 0, debugRunDurationMinutes: 1}, + timeout: 20 * time.Minute, + additionalDuration: 0 * time.Minute, + cutover: 30 * time.Second, + skip: "for local ad hoc testing", + }, + { + name: "c2c/BulkOps", + srcNodes: 4, + dstNodes: 4, + cpus: 8, + pdSize: 500, + workload: replicateBulkOps{}, + timeout: 4 * time.Hour, + additionalDuration: 0, + cutover: 5 * time.Minute, + }, } { sp := sp clusterOps := make([]spec.Option, 0) @@ -448,93 +639,93 @@ func registerClusterToCluster(r registry.Registry) { Owner: registry.OwnerDisasterRecovery, Cluster: r.MakeClusterSpec(sp.dstNodes+sp.srcNodes+1, clusterOps...), Timeout: sp.timeout, + Skip: sp.skip, RequiresLicense: true, Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { - setup := setupC2C(ctx, t, c, sp.srcNodes, sp.dstNodes) - m := c.NewMonitor(ctx, setup.src.nodes.Merge(setup.dst.nodes)) - metricSnapper := setup.startStatsCollection(ctx, t, c) - if initCmd := sp.workload.sourceInitCmd(setup.src.name, setup.src.nodes); initCmd != "" { - t.Status("populating source cluster before replication") - initStart := timeutil.Now() - c.Run(ctx, setup.workloadNode, initCmd) - t.L().Printf("src cluster workload initialization took %s minutes", timeutil.Since(initStart).Minutes()) - } + sp.setupC2C(ctx, t, c) + m := c.NewMonitor(ctx, sp.setup.src.nodes.Merge(sp.setup.dst.nodes)) + hc := NewHealthChecker(t, c, sp.setup.src.nodes.Merge(sp.setup.dst.nodes)) + + m.Go(func(ctx context.Context) error { + require.NoError(t, hc.Runner(ctx)) + return nil + }) + defer hc.Done() + + metricSnapper := sp.startStatsCollection(ctx) + sp.preStreamingWorkload(ctx) + t.L().Printf("begin workload on src cluster") + // The roachtest driver can use the workloadCtx to cancel the workload. workloadCtx, workloadCancel := context.WithCancel(ctx) defer workloadCancel() + workloadDoneCh := make(chan struct{}) m.Go(func(ctx context.Context) error { - err := c.RunE(workloadCtx, setup.workloadNode, - sp.workload.sourceRunCmd(setup.src.name, setup.src.nodes)) - // The workload should only stop if the workloadCtx is cancelled once - // sp.additionalDuration has elapsed after the initial scan completes. - if workloadCtx.Err() == nil { - // Implies the workload context was not cancelled and the workload cmd returned on - // its own. + defer close(workloadDoneCh) + err := sp.runWorkload(workloadCtx) + // The workload should only return an error if the roachtest driver cancels the + // workloadCtx after sp.additionalDuration has elapsed after the initial scan completes. + if err != nil && workloadCtx.Err() == nil { + // Implies the workload context was not cancelled and the workload cmd returned a + // different error. return errors.Wrapf(err, `Workload context was not cancelled. Error returned by workload cmd`) } - workloadDoneCh <- struct{}{} + t.L().Printf("workload successfully finished") return nil }) t.Status("starting replication stream") - setup.metrics.initalScanStart = newMetricSnapshot(metricSnapper, timeutil.Now()) - - // There's no need to remove the tenant limiters for this new app tenant, as - // all replication traffic flows through the system tenant. - streamReplStmt := fmt.Sprintf("CREATE TENANT %q FROM REPLICATION OF %q ON '%s'", - setup.dst.name, setup.src.name, setup.src.pgURL) - setup.dst.sysSQL.Exec(t, streamReplStmt) - ingestionJobID := getIngestionJobID(t, setup.dst.sysSQL, setup.dst.name) - - // TODO(ssd): The job doesn't record the initial - // statement time, so we can't correctly measure the - // initial scan time here. + sp.metrics.initalScanStart = newMetricSnapshot(metricSnapper, timeutil.Now()) + ingestionJobID := sp.startReplicationStream() + lv := makeLatencyVerifier("stream-ingestion", 0, 2*time.Minute, t.L(), getStreamIngestionJobInfo, t.Status, false) defer lv.maybeLogLatencyHist() m.Go(func(ctx context.Context) error { - return lv.pollLatency(ctx, setup.dst.db, ingestionJobID, time.Second, workloadDoneCh) + return lv.pollLatency(ctx, sp.setup.dst.db, ingestionJobID, time.Second, workloadDoneCh) }) t.L().Printf("waiting for replication stream to finish ingesting initial scan") - waitForHighWatermark(t, setup.dst.db, ingestionJobID, sp.timeout/2) - setup.metrics.initialScanEnd = newMetricSnapshot(metricSnapper, timeutil.Now()) + sp.waitForHighWatermark(ingestionJobID, sp.timeout/2) + sp.metrics.initialScanEnd = newMetricSnapshot(metricSnapper, timeutil.Now()) t.Status(fmt.Sprintf(`initial scan complete. run workload and repl. stream for another %s minutes`, sp.additionalDuration)) - var currentTime time.Time - setup.dst.sysSQL.QueryRow(t, "SELECT clock_timestamp()").Scan(¤tTime) - cutoverTime := currentTime.Add(sp.additionalDuration - sp.cutover) - t.Status("cutover time chosen: ", cutoverTime.String()) - select { - case <-time.After(sp.additionalDuration): + case <-workloadDoneCh: + t.L().Printf("workload finished on its own") + case <-time.After(sp.getWorkloadTimeout()): workloadCancel() - t.L().Printf("workload has finished after %s", sp.additionalDuration) + t.L().Printf("workload has cancelled after %s", sp.additionalDuration) case <-ctx.Done(): t.L().Printf(`roachtest context cancelled while waiting for workload duration to complete`) return } - t.Status(fmt.Sprintf("waiting for replication stream to cutover to %s", cutoverTime.String())) - retainedTime := getReplicationRetainedTime(t, setup.dst.sysSQL, roachpb.TenantName(setup.dst.name)) - setup.metrics.cutoverTo = newMetricSnapshot(metricSnapper, cutoverTime) - setup.metrics.cutoverStart = newMetricSnapshot(metricSnapper, timeutil.Now()) - stopReplicationStream(t, setup.dst.sysSQL, ingestionJobID, cutoverTime) - setup.metrics.cutoverEnd = newMetricSnapshot(metricSnapper, timeutil.Now()) + var currentTime time.Time + sp.setup.dst.sysSQL.QueryRow(sp.t, "SELECT clock_timestamp()").Scan(¤tTime) + cutoverTime := currentTime.Add(-sp.cutover) + sp.t.Status("cutover time chosen: ", cutoverTime.String()) + + retainedTime := sp.getReplicationRetainedTime() + + sp.metrics.cutoverTo = newMetricSnapshot(metricSnapper, cutoverTime) + sp.metrics.cutoverStart = newMetricSnapshot(metricSnapper, timeutil.Now()) - setup.metrics.export(t, len(setup.src.nodes)) + sp.t.Status(fmt.Sprintf("waiting for replication stream to cutover to %s", + cutoverTime.String())) + sp.stopReplicationStream(ingestionJobID, cutoverTime) + sp.metrics.cutoverEnd = newMetricSnapshot(metricSnapper, timeutil.Now()) + + sp.metrics.export(sp.t, len(sp.setup.src.nodes)) t.Status("comparing fingerprints") - compareTenantFingerprintsAtTimestamp( - t, - m, - setup, + sp.compareTenantFingerprintsAtTimestamp( + ctx, retainedTime, cutoverTime, ) lv.assertValid(t) - }, }) } @@ -548,44 +739,6 @@ func getIngestionJobID(t test.Test, dstSQL *sqlutils.SQLRunner, dstTenantName st return int(tenantInfo.TenantReplicationJobID) } -func compareTenantFingerprintsAtTimestamp( - t test.Test, m cluster.Monitor, setup *c2cSetup, startTime, endTime time.Time, -) { - t.Status(fmt.Sprintf("comparing tenant fingerprints between start time %s and end time %s", startTime.UTC(), endTime.UTC())) - - // TODO(adityamaru,lidorcarmel): Once we agree on the format and precision we - // display all user facing timestamps with, we should revisit how we format - // the start time to ensure we are fingerprinting from the most accurate lower - // bound. - microSecondRFC3339Format := "2006-01-02 15:04:05.999999" - startTimeStr := startTime.Format(microSecondRFC3339Format) - aost := hlc.Timestamp{WallTime: endTime.UnixNano()}.AsOfSystemTime() - fingerprintQuery := fmt.Sprintf(` -SELECT * -FROM crdb_internal.fingerprint(crdb_internal.tenant_span($1::INT), '%s'::TIMESTAMPTZ, true) -AS OF SYSTEM TIME '%s'`, startTimeStr, aost) - - var srcFingerprint int64 - m.Go(func(ctx context.Context) error { - setup.src.sysSQL.QueryRow(t, fingerprintQuery, setup.src.ID).Scan(&srcFingerprint) - return nil - }) - var destFingerprint int64 - m.Go(func(ctx context.Context) error { - // TODO(adityamaru): Measure and record fingerprinting throughput. - setup.metrics.fingerprintingStart = timeutil.Now() - setup.dst.sysSQL.QueryRow(t, fingerprintQuery, setup.dst.ID).Scan(&destFingerprint) - setup.metrics.fingerprintingEnd = timeutil.Now() - fingerprintingDuration := setup.metrics.fingerprintingEnd.Sub(setup.metrics.fingerprintingStart).String() - t.L().Printf("fingerprinting the destination tenant took %s", fingerprintingDuration) - return nil - }) - - // If the goroutine gets cancelled or fataled, return before comparing fingerprints. - require.NoError(t, m.WaitE()) - require.Equal(t, srcFingerprint, destFingerprint) -} - type streamIngesitonJobInfo struct { status string errMsg string @@ -630,54 +783,6 @@ func getStreamIngestionJobInfo(db *gosql.DB, jobID int) (jobInfo, error) { }, nil } -func waitForHighWatermark(t test.Test, db *gosql.DB, ingestionJobID int, wait time.Duration) { - testutils.SucceedsWithin(t, func() error { - info, err := getStreamIngestionJobInfo(db, ingestionJobID) - if err != nil { - return err - } - if info.GetHighWater().IsZero() { - return errors.New("no high watermark") - } - return nil - }, wait) -} - -// getReplicationRetainedTime returns the `retained_time` of the replication -// job. -func getReplicationRetainedTime( - t test.Test, destSQL *sqlutils.SQLRunner, destTenantName roachpb.TenantName, -) time.Time { - var retainedTime time.Time - destSQL.QueryRow(t, `SELECT retained_time FROM [SHOW TENANT $1 WITH REPLICATION STATUS]`, - destTenantName).Scan(&retainedTime) - return retainedTime -} - -func stopReplicationStream( - t test.Test, destSQL *sqlutils.SQLRunner, ingestionJob int, cutoverTime time.Time, -) { - destSQL.Exec(t, `SELECT crdb_internal.complete_stream_ingestion_job($1, $2)`, ingestionJob, cutoverTime) - err := retry.ForDuration(time.Minute*5, func() error { - var status string - var payloadBytes []byte - destSQL.QueryRow(t, `SELECT status, payload FROM system.jobs WHERE id = $1`, - ingestionJob).Scan(&status, &payloadBytes) - if jobs.Status(status) == jobs.StatusFailed { - payload := &jobspb.Payload{} - if err := protoutil.Unmarshal(payloadBytes, payload); err == nil { - t.Fatalf("job failed: %s", payload.Error) - } - t.Fatalf("job failed") - } - if e, a := jobs.StatusSucceeded, jobs.Status(status); e != a { - return errors.Errorf("expected job status %s, but got %s", e, a) - } - return nil - }) - require.NoError(t, err) -} - func srcClusterSettings(t test.Test, db *sqlutils.SQLRunner) { db.ExecMultiple(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true;`) } diff --git a/pkg/cmd/roachtest/tests/multitenant_utils.go b/pkg/cmd/roachtest/tests/multitenant_utils.go index f1cf7a0ae4e8..04b41ecfd30f 100644 --- a/pkg/cmd/roachtest/tests/multitenant_utils.go +++ b/pkg/cmd/roachtest/tests/multitenant_utils.go @@ -300,7 +300,7 @@ func createTenantAdminRole(t test.Test, tenantName string, tenantSQL *sqlutils.S } // createInMemoryTenant runs through the necessary steps to create an in-memory tenant without -// resource limits. +// resource limits and full dbconsole viewing privileges. func createInMemoryTenant( ctx context.Context, t test.Test, @@ -312,6 +312,7 @@ func createInMemoryTenant( sysSQL := sqlutils.MakeSQLRunner(c.Conn(ctx, t.L(), nodes.RandNode()[0])) sysSQL.Exec(t, "CREATE TENANT $1", tenantName) sysSQL.Exec(t, "ALTER TENANT $1 START SERVICE SHARED", tenantName) + sysSQL.Exec(t, `ALTER TENANT $1 GRANT CAPABILITY can_view_node_info=true, can_admin_split=true,can_view_tsdb_metrics=true`, tenantName) removeTenantRateLimiters(t, sysSQL, tenantName)