diff --git a/pkg/cmd/roachtest/tests/BUILD.bazel b/pkg/cmd/roachtest/tests/BUILD.bazel index 9a8d81b6c65f..012fd06d3bac 100644 --- a/pkg/cmd/roachtest/tests/BUILD.bazel +++ b/pkg/cmd/roachtest/tests/BUILD.bazel @@ -261,6 +261,7 @@ go_library( "@com_github_aws_aws_sdk_go_v2_service_rds//:rds", "@com_github_aws_aws_sdk_go_v2_service_rds//types", "@com_github_aws_aws_sdk_go_v2_service_secretsmanager//:secretsmanager", + "@com_github_cockroachdb_apd_v3//:apd", "@com_github_cockroachdb_cockroach_go_v2//crdb", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_ttycolor//:ttycolor", diff --git a/pkg/cmd/roachtest/tests/cluster_to_cluster.go b/pkg/cmd/roachtest/tests/cluster_to_cluster.go index bb2bf7b2250a..0cfb5e702760 100644 --- a/pkg/cmd/roachtest/tests/cluster_to_cluster.go +++ b/pkg/cmd/roachtest/tests/cluster_to_cluster.go @@ -23,6 +23,7 @@ import ( "strings" "time" + apd "github.com/cockroachdb/apd/v3" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/clusterstats" @@ -555,18 +556,34 @@ func (rd *replicationDriver) getWorkloadTimeout() time.Duration { // getReplicationRetainedTime returns the `retained_time` of the replication // job. -func (rd *replicationDriver) getReplicationRetainedTime() time.Time { +func (rd *replicationDriver) getReplicationRetainedTime() hlc.Timestamp { var retainedTime time.Time rd.setup.dst.sysSQL.QueryRow(rd.t, `SELECT retained_time FROM [SHOW TENANT $1 WITH REPLICATION STATUS]`, roachpb.TenantName(rd.setup.dst.name)).Scan(&retainedTime) - return retainedTime + return hlc.Timestamp{WallTime: retainedTime.UnixNano()} +} + +func DecimalTimeToHLC(t test.Test, s string) hlc.Timestamp { + d, _, err := apd.NewFromString(s) + require.NoError(t, err) + ts, err := hlc.DecimalToHLC(d) + require.NoError(t, err) + return ts } func (rd *replicationDriver) stopReplicationStream( ctx context.Context, ingestionJob int, cutoverTime time.Time, -) { - rd.setup.dst.sysSQL.Exec(rd.t, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, rd.setup.dst.name, cutoverTime) +) (actualCutoverTime hlc.Timestamp) { + var cutoverStr string + if cutoverTime.IsZero() { + rd.setup.dst.sysSQL.QueryRow(rd.t, `ALTER TENANT $1 COMPLETE REPLICATION TO LATEST`, + rd.setup.dst.name).Scan(&cutoverStr) + } else { + rd.setup.dst.sysSQL.QueryRow(rd.t, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, + rd.setup.dst.name, cutoverTime).Scan(&cutoverStr) + } + actualCutoverTime = DecimalTimeToHLC(rd.t, cutoverStr) err := retry.ForDuration(rd.rs.cutoverTimeout, func() error { var status string var payloadBytes []byte @@ -590,20 +607,18 @@ func (rd *replicationDriver) stopReplicationStream( return nil }) require.NoError(rd.t, err) + return actualCutoverTime } func (rd *replicationDriver) compareTenantFingerprintsAtTimestamp( - ctx context.Context, startTime, endTime time.Time, + ctx context.Context, startTime, endTime hlc.Timestamp, ) { rd.t.Status(fmt.Sprintf("comparing tenant fingerprints between start time %s and end time %s", - startTime.UTC(), endTime.UTC())) - - startTimeDecimal := hlc.Timestamp{WallTime: startTime.UnixNano()}.AsOfSystemTime() - aost := hlc.Timestamp{WallTime: endTime.UnixNano()}.AsOfSystemTime() + startTime, endTime)) fingerprintQuery := fmt.Sprintf(` SELECT * FROM crdb_internal.fingerprint(crdb_internal.tenant_span($1::INT), '%s'::DECIMAL, true) -AS OF SYSTEM TIME '%s'`, startTimeDecimal, aost) +AS OF SYSTEM TIME '%s'`, startTime.AsOfSystemTime(), endTime.AsOfSystemTime()) var srcFingerprint int64 fingerPrintMonitor := rd.newMonitor(ctx) @@ -624,14 +639,12 @@ AS OF SYSTEM TIME '%s'`, startTimeDecimal, aost) // If the goroutine gets cancelled or fataled, return before comparing fingerprints. require.NoError(rd.t, fingerPrintMonitor.WaitE()) if srcFingerprint != destFingerprint { - startHlc := hlc.Timestamp{WallTime: startTime.UnixNano()} - endHlc := hlc.Timestamp{WallTime: endTime.UnixNano()} rd.t.L().Printf("fingerpint mismatch: conducting table level fingerprints") srcTenantConn := rd.c.Conn(ctx, rd.t.L(), 1, option.TenantName(rd.setup.src.name)) dstTenantConn := rd.c.Conn(ctx, rd.t.L(), rd.rs.srcNodes+1, option.TenantName(rd.setup.dst.name)) require.NoError(rd.t, replicationutils.InvestigateFingerprints(ctx, srcTenantConn, dstTenantConn, - startHlc, - endHlc)) + startTime, + endTime)) rd.t.L().Printf("fingerprints by table seem to match") } require.Equal(rd.t, srcFingerprint, destFingerprint) @@ -741,23 +754,28 @@ func (rd *replicationDriver) main(ctx context.Context) { rd.t.L().Printf(`roachtest context cancelled while waiting for workload duration to complete`) return } - var currentTime time.Time - rd.setup.dst.sysSQL.QueryRow(rd.t, "SELECT clock_timestamp()").Scan(¤tTime) - cutoverTime := currentTime.Add(-rd.rs.cutover) - rd.t.Status("cutover time chosen: ", cutoverTime.String()) rd.checkParticipatingNodes(ingestionJobID) retainedTime := rd.getReplicationRetainedTime() - require.GreaterOrEqual(rd.t, cutoverTime, retainedTime, - "cannot cutover to a time below the retained time (did the test already fail?)") + var cutoverTime time.Time + cutoverTo := "LATEST" + if rd.rs.cutover.Nanoseconds() != 0 { + var currentTime time.Time + rd.setup.dst.sysSQL.QueryRow(rd.t, "SELECT clock_timestamp()").Scan(¤tTime) + cutoverTime = currentTime.Add(-rd.rs.cutover) + rd.t.Status("cutover time chosen: ", cutoverTime.String()) + cutoverTo = cutoverTime.String() + require.GreaterOrEqual(rd.t, cutoverTime, retainedTime.GoTime(), + "cannot cutover to a time below the retained time (did the test already fail?)") + } - rd.metrics.cutoverTo = newMetricSnapshot(metricSnapper, cutoverTime) rd.metrics.cutoverStart = newMetricSnapshot(metricSnapper, timeutil.Now()) - rd.t.Status(fmt.Sprintf("waiting for replication stream to cutover to %s", - cutoverTime.String())) - rd.stopReplicationStream(ctx, ingestionJobID, cutoverTime) + rd.t.Status(fmt.Sprintf("waiting for replication stream to cutover to %s", cutoverTo)) + actualCutoverTime := rd.stopReplicationStream(ctx, ingestionJobID, cutoverTime) + + rd.metrics.cutoverTo = newMetricSnapshot(metricSnapper, actualCutoverTime.GoTime()) rd.metrics.cutoverEnd = newMetricSnapshot(metricSnapper, timeutil.Now()) rd.metrics.export(rd.t, len(rd.setup.src.nodes)) @@ -766,7 +784,7 @@ func (rd *replicationDriver) main(ctx context.Context) { rd.compareTenantFingerprintsAtTimestamp( ctx, retainedTime, - cutoverTime, + actualCutoverTime, ) lv.assertValid(rd.t) } @@ -820,7 +838,8 @@ func runAcceptanceClusterReplication(ctx context.Context, t test.Test, c cluster func registerClusterToCluster(r registry.Registry) { for _, sp := range []replicationSpec{ { - name: "c2c/tpcc/warehouses=500/duration=10/cutover=5", + // Cutover TO LATEST: + name: "c2c/tpcc/warehouses=500/duration=10/cutover=0", benchmark: true, srcNodes: 4, dstNodes: 4, @@ -833,7 +852,7 @@ func registerClusterToCluster(r registry.Registry) { workload: replicateTPCC{warehouses: 500}, timeout: 1 * time.Hour, additionalDuration: 10 * time.Minute, - cutover: 5 * time.Minute, + cutover: 0, }, { name: "c2c/tpcc/warehouses=1000/duration=60/cutover=30",