diff --git a/pkg/cmd/roachtest/tests/BUILD.bazel b/pkg/cmd/roachtest/tests/BUILD.bazel index 43a2464afc1f..1e80a9fc9573 100644 --- a/pkg/cmd/roachtest/tests/BUILD.bazel +++ b/pkg/cmd/roachtest/tests/BUILD.bazel @@ -210,6 +210,7 @@ go_library( "//pkg/ccl/changefeedccl", "//pkg/ccl/changefeedccl/cdctest", "//pkg/ccl/changefeedccl/changefeedbase", + "//pkg/ccl/crosscluster/replicationtestutils", "//pkg/ccl/crosscluster/replicationutils", "//pkg/ccl/storageccl/engineccl/enginepbccl", "//pkg/cli", diff --git a/pkg/cmd/roachtest/tests/logical_data_replication.go b/pkg/cmd/roachtest/tests/logical_data_replication.go index 5c03b14afd2a..68dac634cf65 100644 --- a/pkg/cmd/roachtest/tests/logical_data_replication.go +++ b/pkg/cmd/roachtest/tests/logical_data_replication.go @@ -18,6 +18,7 @@ import ( "sync" "time" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationtestutils" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" @@ -175,17 +176,22 @@ func registerLogicalDataReplicationTests(r registry.Registry) { func TestLDRBasic(ctx context.Context, t test.Test, c cluster.Cluster, setup multiClusterSetup) { duration := 15 * time.Minute + initRows := 1000 + maxBlockBytes := 1024 + if c.IsLocal() { - duration = 5 * time.Minute + duration = 30 * time.Second + initRows = 10 + maxBlockBytes = 32 } ldrWorkload := LDRWorkload{ workload: replicateKV{ readPercent: 0, debugRunDuration: duration, - maxBlockBytes: 1024, - initRows: 1000, - initWithSplitAndScatter: true}, + maxBlockBytes: maxBlockBytes, + initRows: initRows, + initWithSplitAndScatter: !c.IsLocal()}, dbName: "kv", tableName: "kv", } @@ -231,7 +237,7 @@ func TestLDRBasic(ctx context.Context, t test.Test, c cluster.Cluster, setup mul llv.assertValid(t) rlv.assertValid(t) - VerifyCorrectness(t, setup, leftJobID, rightJobID, 2*time.Minute, ldrWorkload) + VerifyCorrectness(ctx, c, t, setup, leftJobID, rightJobID, 2*time.Minute, ldrWorkload) } func TestLDRSchemaChange( @@ -307,7 +313,7 @@ func TestLDRSchemaChange( llv.assertValid(t) rlv.assertValid(t) - VerifyCorrectness(t, setup, leftJobID, rightJobID, 2*time.Minute, ldrWorkload) + VerifyCorrectness(ctx, c, t, setup, leftJobID, rightJobID, 2*time.Minute, ldrWorkload) } func TestLDRUpdateHeavy( @@ -371,7 +377,7 @@ func TestLDRUpdateHeavy( llv.assertValid(t) rlv.assertValid(t) - VerifyCorrectness(t, setup, leftJobID, rightJobID, 2*time.Minute, ldrWorkload) + VerifyCorrectness(ctx, c, t, setup, leftJobID, rightJobID, 2*time.Minute, ldrWorkload) } func TestLDROnNodeShutdown( @@ -463,7 +469,7 @@ func TestLDROnNodeShutdown( } monitor.Wait() - VerifyCorrectness(t, setup, leftJobID, rightJobID, 5*time.Minute, ldrWorkload) + VerifyCorrectness(ctx, c, t, setup, leftJobID, rightJobID, 5*time.Minute, ldrWorkload) } // TestLDROnNetworkPartition aims to see what happens when both clusters @@ -522,7 +528,7 @@ func TestLDROnNetworkPartition( t.L().Printf("Nodes reconnected. Waiting for workload to complete") monitor.Wait() - VerifyCorrectness(t, setup, leftJobID, rightJobID, 5*time.Minute, ldrWorkload) + VerifyCorrectness(ctx, c, t, setup, leftJobID, rightJobID, 5*time.Minute, ldrWorkload) } type ldrJobInfo struct { @@ -709,6 +715,8 @@ func setupLDR( } func VerifyCorrectness( + ctx context.Context, + c cluster.Cluster, t test.Test, setup multiClusterSetup, leftJobID, rightJobID int, @@ -720,15 +728,22 @@ func VerifyCorrectness( waitForReplicatedTimeToReachTimestamp(t, leftJobID, setup.left.db, getLogicalDataReplicationJobInfo, waitTime, now) waitForReplicatedTimeToReachTimestamp(t, rightJobID, setup.right.db, getLogicalDataReplicationJobInfo, waitTime, now) - - // TODO(ssd): Decide how we want to fingerprint - // this table while we are using in-row storage - // for crdb_internal_mvcc_timestamp. - var leftCount, rightCount int - selectQuery := fmt.Sprintf("SELECT count(1) FROM %s.%s", ldrWorkload.dbName, ldrWorkload.tableName) - setup.left.sysSQL.QueryRow(t, selectQuery).Scan(&leftCount) - setup.right.sysSQL.QueryRow(t, selectQuery).Scan(&rightCount) - require.Equal(t, leftCount, rightCount) + require.NoError(t, replicationtestutils.CheckEmptyDLQs(ctx, setup.left.db, ldrWorkload.dbName)) + require.NoError(t, replicationtestutils.CheckEmptyDLQs(ctx, setup.right.db, ldrWorkload.dbName)) + + m := c.NewMonitor(context.Background(), setup.CRDBNodes()) + var leftFingerprint, rightFingerprint [][]string + queryStmt := fmt.Sprintf("SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE %s.%s WITH EXCLUDE COLUMNS = ('crdb_replication_origin_timestamp')", ldrWorkload.dbName, ldrWorkload.tableName) + m.Go(func(ctx context.Context) error { + leftFingerprint = setup.left.sysSQL.QueryStr(t, queryStmt) + return nil + }) + m.Go(func(ctx context.Context) error { + rightFingerprint = setup.right.sysSQL.QueryStr(t, queryStmt) + return nil + }) + m.Wait() + require.Equal(t, leftFingerprint, rightFingerprint) } func getDebugZips(ctx context.Context, t test.Test, c cluster.Cluster, setup multiClusterSetup) {