Skip to content

Commit

Permalink
roachtest: add a c2c cutover TO LATEST test
Browse files Browse the repository at this point in the history
We only have c2c roachtests that cutover to the past, adding one that
does a cutover to LATEST. Using the `TO LATEST` sql because we expect
that to be used more in production.

Epic: none

Release note: None
  • Loading branch information
lidorcarmel committed Aug 17, 2023
1 parent 58e9d7d commit 331cd87
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 27 deletions.
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ go_library(
"@com_github_aws_aws_sdk_go_v2_service_rds//:rds",
"@com_github_aws_aws_sdk_go_v2_service_rds//types",
"@com_github_aws_aws_sdk_go_v2_service_secretsmanager//:secretsmanager",
"@com_github_cockroachdb_apd_v3//:apd",
"@com_github_cockroachdb_cockroach_go_v2//crdb",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_ttycolor//:ttycolor",
Expand Down
73 changes: 46 additions & 27 deletions pkg/cmd/roachtest/tests/cluster_to_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strings"
"time"

apd "github.com/cockroachdb/apd/v3"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/clusterstats"
Expand Down Expand Up @@ -555,18 +556,34 @@ func (rd *replicationDriver) getWorkloadTimeout() time.Duration {

// getReplicationRetainedTime returns the `retained_time` of the replication
// job.
func (rd *replicationDriver) getReplicationRetainedTime() time.Time {
func (rd *replicationDriver) getReplicationRetainedTime() hlc.Timestamp {
var retainedTime time.Time
rd.setup.dst.sysSQL.QueryRow(rd.t,
`SELECT retained_time FROM [SHOW TENANT $1 WITH REPLICATION STATUS]`,
roachpb.TenantName(rd.setup.dst.name)).Scan(&retainedTime)
return retainedTime
return hlc.Timestamp{WallTime: retainedTime.UnixNano()}
}

func DecimalTimeToHLC(t test.Test, s string) hlc.Timestamp {
d, _, err := apd.NewFromString(s)
require.NoError(t, err)
ts, err := hlc.DecimalToHLC(d)
require.NoError(t, err)
return ts
}

func (rd *replicationDriver) stopReplicationStream(
ctx context.Context, ingestionJob int, cutoverTime time.Time,
) {
rd.setup.dst.sysSQL.Exec(rd.t, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, rd.setup.dst.name, cutoverTime)
) (actualCutoverTime hlc.Timestamp) {
var cutoverStr string
if cutoverTime.IsZero() {
rd.setup.dst.sysSQL.QueryRow(rd.t, `ALTER TENANT $1 COMPLETE REPLICATION TO LATEST`,
rd.setup.dst.name).Scan(&cutoverStr)
} else {
rd.setup.dst.sysSQL.QueryRow(rd.t, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`,
rd.setup.dst.name, cutoverTime).Scan(&cutoverStr)
}
actualCutoverTime = DecimalTimeToHLC(rd.t, cutoverStr)
err := retry.ForDuration(rd.rs.cutoverTimeout, func() error {
var status string
var payloadBytes []byte
Expand All @@ -590,20 +607,18 @@ func (rd *replicationDriver) stopReplicationStream(
return nil
})
require.NoError(rd.t, err)
return actualCutoverTime
}

func (rd *replicationDriver) compareTenantFingerprintsAtTimestamp(
ctx context.Context, startTime, endTime time.Time,
ctx context.Context, startTime, endTime hlc.Timestamp,
) {
rd.t.Status(fmt.Sprintf("comparing tenant fingerprints between start time %s and end time %s",
startTime.UTC(), endTime.UTC()))

startTimeDecimal := hlc.Timestamp{WallTime: startTime.UnixNano()}.AsOfSystemTime()
aost := hlc.Timestamp{WallTime: endTime.UnixNano()}.AsOfSystemTime()
startTime, endTime))
fingerprintQuery := fmt.Sprintf(`
SELECT *
FROM crdb_internal.fingerprint(crdb_internal.tenant_span($1::INT), '%s'::DECIMAL, true)
AS OF SYSTEM TIME '%s'`, startTimeDecimal, aost)
AS OF SYSTEM TIME '%s'`, startTime.AsOfSystemTime(), endTime.AsOfSystemTime())

var srcFingerprint int64
fingerPrintMonitor := rd.newMonitor(ctx)
Expand All @@ -624,14 +639,12 @@ AS OF SYSTEM TIME '%s'`, startTimeDecimal, aost)
// If the goroutine gets cancelled or fataled, return before comparing fingerprints.
require.NoError(rd.t, fingerPrintMonitor.WaitE())
if srcFingerprint != destFingerprint {
startHlc := hlc.Timestamp{WallTime: startTime.UnixNano()}
endHlc := hlc.Timestamp{WallTime: endTime.UnixNano()}
rd.t.L().Printf("fingerpint mismatch: conducting table level fingerprints")
srcTenantConn := rd.c.Conn(ctx, rd.t.L(), 1, option.TenantName(rd.setup.src.name))
dstTenantConn := rd.c.Conn(ctx, rd.t.L(), rd.rs.srcNodes+1, option.TenantName(rd.setup.dst.name))
require.NoError(rd.t, replicationutils.InvestigateFingerprints(ctx, srcTenantConn, dstTenantConn,
startHlc,
endHlc))
startTime,
endTime))
rd.t.L().Printf("fingerprints by table seem to match")
}
require.Equal(rd.t, srcFingerprint, destFingerprint)
Expand Down Expand Up @@ -741,23 +754,28 @@ func (rd *replicationDriver) main(ctx context.Context) {
rd.t.L().Printf(`roachtest context cancelled while waiting for workload duration to complete`)
return
}
var currentTime time.Time
rd.setup.dst.sysSQL.QueryRow(rd.t, "SELECT clock_timestamp()").Scan(&currentTime)
cutoverTime := currentTime.Add(-rd.rs.cutover)
rd.t.Status("cutover time chosen: ", cutoverTime.String())

rd.checkParticipatingNodes(ingestionJobID)

retainedTime := rd.getReplicationRetainedTime()
require.GreaterOrEqual(rd.t, cutoverTime, retainedTime,
"cannot cutover to a time below the retained time (did the test already fail?)")
var cutoverTime time.Time
cutoverTo := "LATEST"
if rd.rs.cutover.Nanoseconds() != 0 {
var currentTime time.Time
rd.setup.dst.sysSQL.QueryRow(rd.t, "SELECT clock_timestamp()").Scan(&currentTime)
cutoverTime = currentTime.Add(-rd.rs.cutover)
rd.t.Status("cutover time chosen: ", cutoverTime.String())
cutoverTo = cutoverTime.String()
require.GreaterOrEqual(rd.t, cutoverTime, retainedTime.GoTime(),
"cannot cutover to a time below the retained time (did the test already fail?)")
}

rd.metrics.cutoverTo = newMetricSnapshot(metricSnapper, cutoverTime)
rd.metrics.cutoverStart = newMetricSnapshot(metricSnapper, timeutil.Now())

rd.t.Status(fmt.Sprintf("waiting for replication stream to cutover to %s",
cutoverTime.String()))
rd.stopReplicationStream(ctx, ingestionJobID, cutoverTime)
rd.t.Status(fmt.Sprintf("waiting for replication stream to cutover to %s", cutoverTo))
actualCutoverTime := rd.stopReplicationStream(ctx, ingestionJobID, cutoverTime)

rd.metrics.cutoverTo = newMetricSnapshot(metricSnapper, actualCutoverTime.GoTime())
rd.metrics.cutoverEnd = newMetricSnapshot(metricSnapper, timeutil.Now())

rd.metrics.export(rd.t, len(rd.setup.src.nodes))
Expand All @@ -766,7 +784,7 @@ func (rd *replicationDriver) main(ctx context.Context) {
rd.compareTenantFingerprintsAtTimestamp(
ctx,
retainedTime,
cutoverTime,
actualCutoverTime,
)
lv.assertValid(rd.t)
}
Expand Down Expand Up @@ -820,7 +838,8 @@ func runAcceptanceClusterReplication(ctx context.Context, t test.Test, c cluster
func registerClusterToCluster(r registry.Registry) {
for _, sp := range []replicationSpec{
{
name: "c2c/tpcc/warehouses=500/duration=10/cutover=5",
// Cutover TO LATEST:
name: "c2c/tpcc/warehouses=500/duration=10/cutover=0",
benchmark: true,
srcNodes: 4,
dstNodes: 4,
Expand All @@ -833,7 +852,7 @@ func registerClusterToCluster(r registry.Registry) {
workload: replicateTPCC{warehouses: 500},
timeout: 1 * time.Hour,
additionalDuration: 10 * time.Minute,
cutover: 5 * time.Minute,
cutover: 0,
},
{
name: "c2c/tpcc/warehouses=1000/duration=60/cutover=30",
Expand Down

0 comments on commit 331cd87

Please sign in to comment.