diff --git a/pkg/cmd/roachtest/tests/cluster_to_cluster.go b/pkg/cmd/roachtest/tests/cluster_to_cluster.go index 6d29fe4d63f9..edc892c11cc2 100644 --- a/pkg/cmd/roachtest/tests/cluster_to_cluster.go +++ b/pkg/cmd/roachtest/tests/cluster_to_cluster.go @@ -244,9 +244,30 @@ type replicateKV struct { // the number of rows inserted into the cluster before c2c begins initRows int - // max size of raw data written during each insertion + // initWithSplitAndScatter splits and scatters the kv table after + // initialization. This should be set for tests that init little or no data. + // + // The kv workload can run split and scatter after the insertion step. After + // inserting a lot of kv table data (think multiple gb), however, kv has had + // enough time to split and scatter that table across the cluster, so there + // isn't a need to run an expensive scatter query. + // + // Initing a couple mb of kv table as other c2c tests do, however, does not + // lead kv to naturally split and scatter the table by the time the c2c job + // begins, which will lead the job to create a non-distributed initial dist + // sql plan because the source side dsp.PartitionSpans may only assign one + // node pair to stream all of the data. + initWithSplitAndScatter bool + + // maxBlockBytes indicates the maximum size of the kv payload value written + // during each insertion. The kv workload will randomly choose a value in the + // interval [1,maxBlockSize] with equal probability, i.e. via + // x~Uniform[1,maxBlockSize]. maxBlockBytes int + // maxQPS caps the queries per second sent to the source cluster after initialization. + maxQPS int + // partitionKVDatabaseInRegion constrains the kv database in the specified // region and asserts, before cutover, that the replicated span configuration // correctly enforces the regional constraint in the destination tenant. @@ -260,9 +281,11 @@ type replicateKV struct { func (kv replicateKV) sourceInitCmd(tenantName string, nodes option.NodeListOption) string { cmd := roachtestutil.NewCommand(`./workload init kv`). MaybeFlag(kv.initRows > 0, "insert-count", kv.initRows). + // Only set the max block byte values for the init command if we + // actually need to insert rows. MaybeFlag(kv.initRows > 0, "max-block-bytes", kv.maxBlockBytes). - Flag("splits", 100). - Option("scatter"). + MaybeFlag(kv.initWithSplitAndScatter, "splits", 100). + MaybeOption(kv.initWithSplitAndScatter, "scatter"). Arg("{pgurl%s:%s}", nodes, tenantName) return cmd.String() } @@ -273,6 +296,7 @@ func (kv replicateKV) sourceRunCmd(tenantName string, nodes option.NodeListOptio Flag("max-block-bytes", kv.maxBlockBytes). Flag("read-percent", kv.readPercent). MaybeFlag(kv.debugRunDuration > 0, "duration", kv.debugRunDuration). + MaybeFlag(kv.maxQPS > 0, "max-rate", kv.maxQPS). Arg("{pgurl%s:%s}", nodes, tenantName) return cmd.String() } @@ -394,6 +418,11 @@ type replicationSpec struct { // maxLatency override the maxAcceptedLatencyDefault. maxAcceptedLatency time.Duration + // TODO(msbutler): this knob only exists because the revision history + // fingerprint can encounter a gc ttl error for large fingerprints. Delete + // this knob once we lay a pts during fingerprinting. + nonRevisionHistoryFingerprint bool + // skipNodeDistributionCheck removes the requirement that multiple source and // destination nodes must participate in the replication stream. This should // be set if the roachtest runs on single node clusters or if the @@ -707,6 +736,13 @@ SELECT * FROM crdb_internal.fingerprint(crdb_internal.tenant_span($1::INT), '%s'::DECIMAL, true) AS OF SYSTEM TIME '%s'`, startTime.AsOfSystemTime(), endTime.AsOfSystemTime()) + if rd.rs.nonRevisionHistoryFingerprint { + fingerprintQuery = fmt.Sprintf(` +SELECT * +FROM crdb_internal.fingerprint(crdb_internal.tenant_span($1::INT), 0::DECIMAL, false) +AS OF SYSTEM TIME '%s'`, endTime.AsOfSystemTime()) + } + var srcFingerprint int64 fingerPrintMonitor := rd.newMonitor(ctx) fingerPrintMonitor.Go(func(ctx context.Context) error { @@ -982,7 +1018,7 @@ func runAcceptanceClusterReplication(ctx context.Context, t test.Test, c cluster dstNodes: 1, // The timeout field ensures the c2c roachtest driver behaves properly. timeout: 10 * time.Minute, - workload: replicateKV{readPercent: 0, debugRunDuration: 1 * time.Minute, maxBlockBytes: 1}, + workload: replicateKV{readPercent: 0, debugRunDuration: 1 * time.Minute, maxBlockBytes: 1, initWithSplitAndScatter: true}, additionalDuration: 0 * time.Minute, cutover: 30 * time.Second, skipNodeDistributionCheck: true, @@ -1035,13 +1071,17 @@ func registerClusterToCluster(r registry.Registry) { cutover: 30 * time.Minute, }, { - name: "c2c/kv0", - benchmark: true, - srcNodes: 3, - dstNodes: 3, - cpus: 8, - pdSize: 100, - workload: replicateKV{readPercent: 0, maxBlockBytes: 1024}, + name: "c2c/kv0", + benchmark: true, + srcNodes: 3, + dstNodes: 3, + cpus: 8, + pdSize: 100, + workload: replicateKV{ + readPercent: 0, + maxBlockBytes: 1024, + initWithSplitAndScatter: true, + }, timeout: 1 * time.Hour, additionalDuration: 10 * time.Minute, cutover: 5 * time.Minute, @@ -1064,6 +1104,28 @@ func registerClusterToCluster(r registry.Registry) { additionalDuration: 1 * time.Minute, cutover: 0, }, + { + // Large workload to test our 23.2 perf goals. + name: "c2c/weekly/kv50", + benchmark: true, + srcNodes: 8, + dstNodes: 8, + cpus: 8, + pdSize: 1000, + nonRevisionHistoryFingerprint: true, + + workload: replicateKV{ + // Write a ~2TB initial scan. + initRows: 350000000, + readPercent: 50, + maxBlockBytes: 4096, + maxQPS: 2000, + }, + timeout: 12 * time.Hour, + additionalDuration: 2 * time.Hour, + cutover: 0, + tags: registry.Tags("weekly", "aws-weekly"), + }, { name: "c2c/MultiRegion/SameRegions/kv0", benchmark: true, @@ -1074,6 +1136,7 @@ func registerClusterToCluster(r registry.Registry) { workload: replicateKV{ readPercent: 0, maxBlockBytes: 1024, + initWithSplitAndScatter: true, partitionKVDatabaseInRegion: "us-west1", antiRegion: "us-central1", }, @@ -1093,8 +1156,11 @@ func registerClusterToCluster(r registry.Registry) { dstNodes: 1, cpus: 4, pdSize: 10, - workload: replicateKV{readPercent: 0, debugRunDuration: 1 * time.Minute, - maxBlockBytes: 1024}, + workload: replicateKV{ + readPercent: 0, + debugRunDuration: 1 * time.Minute, + initWithSplitAndScatter: true, + maxBlockBytes: 1024}, timeout: 5 * time.Minute, additionalDuration: 0 * time.Minute, cutover: 30 * time.Second, @@ -1394,7 +1460,7 @@ func registerClusterReplicationResilience(r registry.Registry) { srcNodes: 4, dstNodes: 4, cpus: 8, - workload: replicateKV{readPercent: 0, initRows: 5000000, maxBlockBytes: 1024}, + workload: replicateKV{readPercent: 0, initRows: 5000000, maxBlockBytes: 1024, initWithSplitAndScatter: true}, timeout: 20 * time.Minute, additionalDuration: 6 * time.Minute, cutover: 3 * time.Minute, @@ -1507,7 +1573,7 @@ func registerClusterReplicationDisconnect(r registry.Registry) { srcNodes: 3, dstNodes: 3, cpus: 4, - workload: replicateKV{readPercent: 0, initRows: 1000000, maxBlockBytes: 1024}, + workload: replicateKV{readPercent: 0, initRows: 1000000, maxBlockBytes: 1024, initWithSplitAndScatter: true}, timeout: 20 * time.Minute, additionalDuration: 10 * time.Minute, cutover: 2 * time.Minute,