Skip to content

Commit

Permalink
roachtest: fingerprint at end of LDR roachtest
Browse files Browse the repository at this point in the history
This patch adds a check that the dlq table is empty at the end of the roachtest
and fingerprints the replicating tables.

Epic: none

Release note: none
  • Loading branch information
msbutler committed Sep 16, 2024
1 parent 31ca8eb commit 4f71ca6
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 18 deletions.
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ go_library(
"//pkg/ccl/changefeedccl",
"//pkg/ccl/changefeedccl/cdctest",
"//pkg/ccl/changefeedccl/changefeedbase",
"//pkg/ccl/crosscluster/replicationtestutils",
"//pkg/ccl/crosscluster/replicationutils",
"//pkg/ccl/storageccl/engineccl/enginepbccl",
"//pkg/cli",
Expand Down
51 changes: 33 additions & 18 deletions pkg/cmd/roachtest/tests/logical_data_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"sync"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationtestutils"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
Expand Down Expand Up @@ -175,17 +176,22 @@ func registerLogicalDataReplicationTests(r registry.Registry) {

func TestLDRBasic(ctx context.Context, t test.Test, c cluster.Cluster, setup multiClusterSetup) {
duration := 15 * time.Minute
initRows := 1000
maxBlockBytes := 1024

if c.IsLocal() {
duration = 5 * time.Minute
duration = 30 * time.Second
initRows = 10
maxBlockBytes = 32
}

ldrWorkload := LDRWorkload{
workload: replicateKV{
readPercent: 0,
debugRunDuration: duration,
maxBlockBytes: 1024,
initRows: 1000,
initWithSplitAndScatter: true},
maxBlockBytes: maxBlockBytes,
initRows: initRows,
initWithSplitAndScatter: !c.IsLocal()},
dbName: "kv",
tableName: "kv",
}
Expand Down Expand Up @@ -231,7 +237,7 @@ func TestLDRBasic(ctx context.Context, t test.Test, c cluster.Cluster, setup mul
llv.assertValid(t)
rlv.assertValid(t)

VerifyCorrectness(t, setup, leftJobID, rightJobID, 2*time.Minute, ldrWorkload)
VerifyCorrectness(ctx, c, t, setup, leftJobID, rightJobID, 2*time.Minute, ldrWorkload)
}

func TestLDRSchemaChange(
Expand Down Expand Up @@ -307,7 +313,7 @@ func TestLDRSchemaChange(
llv.assertValid(t)
rlv.assertValid(t)

VerifyCorrectness(t, setup, leftJobID, rightJobID, 2*time.Minute, ldrWorkload)
VerifyCorrectness(ctx, c, t, setup, leftJobID, rightJobID, 2*time.Minute, ldrWorkload)
}

func TestLDRUpdateHeavy(
Expand Down Expand Up @@ -371,7 +377,7 @@ func TestLDRUpdateHeavy(
llv.assertValid(t)
rlv.assertValid(t)

VerifyCorrectness(t, setup, leftJobID, rightJobID, 2*time.Minute, ldrWorkload)
VerifyCorrectness(ctx, c, t, setup, leftJobID, rightJobID, 2*time.Minute, ldrWorkload)
}

func TestLDROnNodeShutdown(
Expand Down Expand Up @@ -463,7 +469,7 @@ func TestLDROnNodeShutdown(
}

monitor.Wait()
VerifyCorrectness(t, setup, leftJobID, rightJobID, 5*time.Minute, ldrWorkload)
VerifyCorrectness(ctx, c, t, setup, leftJobID, rightJobID, 5*time.Minute, ldrWorkload)
}

// TestLDROnNetworkPartition aims to see what happens when both clusters
Expand Down Expand Up @@ -522,7 +528,7 @@ func TestLDROnNetworkPartition(
t.L().Printf("Nodes reconnected. Waiting for workload to complete")

monitor.Wait()
VerifyCorrectness(t, setup, leftJobID, rightJobID, 5*time.Minute, ldrWorkload)
VerifyCorrectness(ctx, c, t, setup, leftJobID, rightJobID, 5*time.Minute, ldrWorkload)
}

type ldrJobInfo struct {
Expand Down Expand Up @@ -709,6 +715,8 @@ func setupLDR(
}

func VerifyCorrectness(
ctx context.Context,
c cluster.Cluster,
t test.Test,
setup multiClusterSetup,
leftJobID, rightJobID int,
Expand All @@ -720,15 +728,22 @@ func VerifyCorrectness(

waitForReplicatedTimeToReachTimestamp(t, leftJobID, setup.left.db, getLogicalDataReplicationJobInfo, waitTime, now)
waitForReplicatedTimeToReachTimestamp(t, rightJobID, setup.right.db, getLogicalDataReplicationJobInfo, waitTime, now)

// TODO(ssd): Decide how we want to fingerprint
// this table while we are using in-row storage
// for crdb_internal_mvcc_timestamp.
var leftCount, rightCount int
selectQuery := fmt.Sprintf("SELECT count(1) FROM %s.%s", ldrWorkload.dbName, ldrWorkload.tableName)
setup.left.sysSQL.QueryRow(t, selectQuery).Scan(&leftCount)
setup.right.sysSQL.QueryRow(t, selectQuery).Scan(&rightCount)
require.Equal(t, leftCount, rightCount)
require.NoError(t, replicationtestutils.CheckEmptyDLQs(ctx, setup.left.db, ldrWorkload.dbName))
require.NoError(t, replicationtestutils.CheckEmptyDLQs(ctx, setup.right.db, ldrWorkload.dbName))

m := c.NewMonitor(context.Background(), setup.CRDBNodes())
var leftFingerprint, rightFingerprint [][]string
queryStmt := fmt.Sprintf("SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE %s.%s WITH EXCLUDE COLUMNS = ('crdb_replication_origin_timestamp')", ldrWorkload.dbName, ldrWorkload.tableName)
m.Go(func(ctx context.Context) error {
leftFingerprint = setup.left.sysSQL.QueryStr(t, queryStmt)
return nil
})
m.Go(func(ctx context.Context) error {
rightFingerprint = setup.right.sysSQL.QueryStr(t, queryStmt)
return nil
})
m.Wait()
require.Equal(t, leftFingerprint, rightFingerprint)
}

func getDebugZips(ctx context.Context, t test.Test, c cluster.Cluster, setup multiClusterSetup) {
Expand Down

0 comments on commit 4f71ca6

Please sign in to comment.