From 31ca8eb7a79b499efa3dc6946ac2ec47f2ce5217 Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Fri, 13 Sep 2024 14:29:24 -0400 Subject: [PATCH 1/2] crosscluster/logical: add CheckEmptyDLQs test util Epic: none Release note: none --- .../logical/logical_replication_job_test.go | 5 ++- .../logical/udf_row_processor_test.go | 3 +- .../replicationtestutils/BUILD.bazel | 1 + .../replicationtestutils/logical.go | 44 +++++++++++++++++++ 4 files changed, 50 insertions(+), 3 deletions(-) create mode 100644 pkg/ccl/crosscluster/replicationtestutils/logical.go diff --git a/pkg/ccl/crosscluster/logical/logical_replication_job_test.go b/pkg/ccl/crosscluster/logical/logical_replication_job_test.go index e7d120174b69..bcdffc92ecec 100644 --- a/pkg/ccl/crosscluster/logical/logical_replication_job_test.go +++ b/pkg/ccl/crosscluster/logical/logical_replication_job_test.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationtestutils" "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationutils" "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/streamclient" _ "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/streamclient/randclient" @@ -697,7 +698,7 @@ func TestRandomTables(t *testing.T) { runnerB.QueryRow(t, streamStartStmt, dbAURL.String()).Scan(&jobBID) WaitUntilReplicatedTime(t, s.Clock().Now(), runnerB, jobBID) - + require.NoError(t, replicationtestutils.CheckEmptyDLQs(ctx, runnerB.DB, "b")) compareReplicatedTables(t, s, "a", "b", tableName, runnerA, runnerB) } @@ -829,7 +830,7 @@ func TestPreviouslyInterestingTables(t *testing.T) { runnerA.Exec(t, fmt.Sprintf("DELETE FROM %s LIMIT 5", tableName)) WaitUntilReplicatedTime(t, s.Clock().Now(), runnerB, jobBID) } - + require.NoError(t, replicationtestutils.CheckEmptyDLQs(ctx, runnerB.DB, "b")) compareReplicatedTables(t, s, "a", "b", tableName, runnerA, runnerB) }) } diff --git a/pkg/ccl/crosscluster/logical/udf_row_processor_test.go b/pkg/ccl/crosscluster/logical/udf_row_processor_test.go index acc8b13e1966..20ff16d031b7 100644 --- a/pkg/ccl/crosscluster/logical/udf_row_processor_test.go +++ b/pkg/ccl/crosscluster/logical/udf_row_processor_test.go @@ -13,6 +13,7 @@ import ( "fmt" "testing" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationtestutils" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/sql/randgen" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -103,7 +104,7 @@ func TestUDFWithRandomTables(t *testing.T) { WaitUntilReplicatedTime(t, s.Clock().Now(), runnerB, jobBID) runnerA.Exec(t, fmt.Sprintf("DELETE FROM %s LIMIT 5", tableName)) WaitUntilReplicatedTime(t, s.Clock().Now(), runnerB, jobBID) - + require.NoError(t, replicationtestutils.CheckEmptyDLQs(ctx, runnerB.DB, "b")) compareReplicatedTables(t, s, "a", "b", tableName, runnerA, runnerB) } diff --git a/pkg/ccl/crosscluster/replicationtestutils/BUILD.bazel b/pkg/ccl/crosscluster/replicationtestutils/BUILD.bazel index 8fe1bcd023e1..bd2e52157789 100644 --- a/pkg/ccl/crosscluster/replicationtestutils/BUILD.bazel +++ b/pkg/ccl/crosscluster/replicationtestutils/BUILD.bazel @@ -5,6 +5,7 @@ go_library( testonly = 1, srcs = [ "encoding.go", + "logical.go", "replication_helpers.go", "span_config_helpers.go", "testutils.go", diff --git a/pkg/ccl/crosscluster/replicationtestutils/logical.go b/pkg/ccl/crosscluster/replicationtestutils/logical.go new file mode 100644 index 000000000000..bf55355bd88d --- /dev/null +++ b/pkg/ccl/crosscluster/replicationtestutils/logical.go @@ -0,0 +1,44 @@ +// Copyright 2024 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package replicationtestutils + +import ( + "context" + "fmt" + + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/errors" +) + +func CheckEmptyDLQs(ctx context.Context, db sqlutils.DBHandle, dbName string) error { + dlqNameQuery := fmt.Sprintf("SELECT table_name FROM [SHOW TABLES FROM %s] where schema_name = 'crdb_replication'", dbName) + rows, err := db.QueryContext(ctx, dlqNameQuery) + if err != nil { + return errors.Wrapf(err, "failed to query dlq table name for database %s", dbName) + } + defer rows.Close() + + var dlqTableName string + var dlqRowCount int + for rows.Next() { + if err := rows.Scan(&dlqTableName); err != nil { + return errors.Wrapf(err, "failed to scan dlq table name for database %s", dbName) + } + if err := db.QueryRowContext(ctx, fmt.Sprintf("SELECT count(*) FROM %s.crdb_replication.%s", dbName, dlqTableName)).Scan(&dlqRowCount); err != nil { + return err + } + if dlqRowCount != 0 { + return fmt.Errorf("expected DLQ to be empty, but found %d rows", dlqRowCount) + } + } + if dlqTableName == "" { + return errors.Newf("didn't find any any dlq tables in database %s", dbName) + } + return nil +} From 4f71ca6f3f28dd2d47c102d52f8b7d04c4333f5e Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Fri, 13 Sep 2024 19:16:32 -0400 Subject: [PATCH 2/2] roachtest: fingerprint at end of LDR roachtest This patch adds a check that the dlq table is empty at the end of the roachtest and fingerprints the replicating tables. Epic: none Release note: none --- pkg/cmd/roachtest/tests/BUILD.bazel | 1 + .../tests/logical_data_replication.go | 51 ++++++++++++------- 2 files changed, 34 insertions(+), 18 deletions(-) diff --git a/pkg/cmd/roachtest/tests/BUILD.bazel b/pkg/cmd/roachtest/tests/BUILD.bazel index 43a2464afc1f..1e80a9fc9573 100644 --- a/pkg/cmd/roachtest/tests/BUILD.bazel +++ b/pkg/cmd/roachtest/tests/BUILD.bazel @@ -210,6 +210,7 @@ go_library( "//pkg/ccl/changefeedccl", "//pkg/ccl/changefeedccl/cdctest", "//pkg/ccl/changefeedccl/changefeedbase", + "//pkg/ccl/crosscluster/replicationtestutils", "//pkg/ccl/crosscluster/replicationutils", "//pkg/ccl/storageccl/engineccl/enginepbccl", "//pkg/cli", diff --git a/pkg/cmd/roachtest/tests/logical_data_replication.go b/pkg/cmd/roachtest/tests/logical_data_replication.go index 5c03b14afd2a..68dac634cf65 100644 --- a/pkg/cmd/roachtest/tests/logical_data_replication.go +++ b/pkg/cmd/roachtest/tests/logical_data_replication.go @@ -18,6 +18,7 @@ import ( "sync" "time" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationtestutils" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" @@ -175,17 +176,22 @@ func registerLogicalDataReplicationTests(r registry.Registry) { func TestLDRBasic(ctx context.Context, t test.Test, c cluster.Cluster, setup multiClusterSetup) { duration := 15 * time.Minute + initRows := 1000 + maxBlockBytes := 1024 + if c.IsLocal() { - duration = 5 * time.Minute + duration = 30 * time.Second + initRows = 10 + maxBlockBytes = 32 } ldrWorkload := LDRWorkload{ workload: replicateKV{ readPercent: 0, debugRunDuration: duration, - maxBlockBytes: 1024, - initRows: 1000, - initWithSplitAndScatter: true}, + maxBlockBytes: maxBlockBytes, + initRows: initRows, + initWithSplitAndScatter: !c.IsLocal()}, dbName: "kv", tableName: "kv", } @@ -231,7 +237,7 @@ func TestLDRBasic(ctx context.Context, t test.Test, c cluster.Cluster, setup mul llv.assertValid(t) rlv.assertValid(t) - VerifyCorrectness(t, setup, leftJobID, rightJobID, 2*time.Minute, ldrWorkload) + VerifyCorrectness(ctx, c, t, setup, leftJobID, rightJobID, 2*time.Minute, ldrWorkload) } func TestLDRSchemaChange( @@ -307,7 +313,7 @@ func TestLDRSchemaChange( llv.assertValid(t) rlv.assertValid(t) - VerifyCorrectness(t, setup, leftJobID, rightJobID, 2*time.Minute, ldrWorkload) + VerifyCorrectness(ctx, c, t, setup, leftJobID, rightJobID, 2*time.Minute, ldrWorkload) } func TestLDRUpdateHeavy( @@ -371,7 +377,7 @@ func TestLDRUpdateHeavy( llv.assertValid(t) rlv.assertValid(t) - VerifyCorrectness(t, setup, leftJobID, rightJobID, 2*time.Minute, ldrWorkload) + VerifyCorrectness(ctx, c, t, setup, leftJobID, rightJobID, 2*time.Minute, ldrWorkload) } func TestLDROnNodeShutdown( @@ -463,7 +469,7 @@ func TestLDROnNodeShutdown( } monitor.Wait() - VerifyCorrectness(t, setup, leftJobID, rightJobID, 5*time.Minute, ldrWorkload) + VerifyCorrectness(ctx, c, t, setup, leftJobID, rightJobID, 5*time.Minute, ldrWorkload) } // TestLDROnNetworkPartition aims to see what happens when both clusters @@ -522,7 +528,7 @@ func TestLDROnNetworkPartition( t.L().Printf("Nodes reconnected. Waiting for workload to complete") monitor.Wait() - VerifyCorrectness(t, setup, leftJobID, rightJobID, 5*time.Minute, ldrWorkload) + VerifyCorrectness(ctx, c, t, setup, leftJobID, rightJobID, 5*time.Minute, ldrWorkload) } type ldrJobInfo struct { @@ -709,6 +715,8 @@ func setupLDR( } func VerifyCorrectness( + ctx context.Context, + c cluster.Cluster, t test.Test, setup multiClusterSetup, leftJobID, rightJobID int, @@ -720,15 +728,22 @@ func VerifyCorrectness( waitForReplicatedTimeToReachTimestamp(t, leftJobID, setup.left.db, getLogicalDataReplicationJobInfo, waitTime, now) waitForReplicatedTimeToReachTimestamp(t, rightJobID, setup.right.db, getLogicalDataReplicationJobInfo, waitTime, now) - - // TODO(ssd): Decide how we want to fingerprint - // this table while we are using in-row storage - // for crdb_internal_mvcc_timestamp. - var leftCount, rightCount int - selectQuery := fmt.Sprintf("SELECT count(1) FROM %s.%s", ldrWorkload.dbName, ldrWorkload.tableName) - setup.left.sysSQL.QueryRow(t, selectQuery).Scan(&leftCount) - setup.right.sysSQL.QueryRow(t, selectQuery).Scan(&rightCount) - require.Equal(t, leftCount, rightCount) + require.NoError(t, replicationtestutils.CheckEmptyDLQs(ctx, setup.left.db, ldrWorkload.dbName)) + require.NoError(t, replicationtestutils.CheckEmptyDLQs(ctx, setup.right.db, ldrWorkload.dbName)) + + m := c.NewMonitor(context.Background(), setup.CRDBNodes()) + var leftFingerprint, rightFingerprint [][]string + queryStmt := fmt.Sprintf("SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE %s.%s WITH EXCLUDE COLUMNS = ('crdb_replication_origin_timestamp')", ldrWorkload.dbName, ldrWorkload.tableName) + m.Go(func(ctx context.Context) error { + leftFingerprint = setup.left.sysSQL.QueryStr(t, queryStmt) + return nil + }) + m.Go(func(ctx context.Context) error { + rightFingerprint = setup.right.sysSQL.QueryStr(t, queryStmt) + return nil + }) + m.Wait() + require.Equal(t, leftFingerprint, rightFingerprint) } func getDebugZips(ctx context.Context, t test.Test, c cluster.Cluster, setup multiClusterSetup) {