Skip to content

Commit

Permalink
Merge #130718
Browse files Browse the repository at this point in the history
130718: crosscluster/logical: check for empty DLQ and fingerprint in ldr roachtests r=azhu-crl a=msbutler

Epic: none

Release note: none

Co-authored-by: Michael Butler <[email protected]>
  • Loading branch information
craig[bot] and msbutler committed Sep 17, 2024
2 parents f3311eb + 4f71ca6 commit caa0ada
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 21 deletions.
5 changes: 3 additions & 2 deletions pkg/ccl/crosscluster/logical/logical_replication_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationtestutils"
"github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationutils"
"github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/streamclient"
_ "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/streamclient/randclient"
Expand Down Expand Up @@ -697,7 +698,7 @@ func TestRandomTables(t *testing.T) {
runnerB.QueryRow(t, streamStartStmt, dbAURL.String()).Scan(&jobBID)

WaitUntilReplicatedTime(t, s.Clock().Now(), runnerB, jobBID)

require.NoError(t, replicationtestutils.CheckEmptyDLQs(ctx, runnerB.DB, "b"))
compareReplicatedTables(t, s, "a", "b", tableName, runnerA, runnerB)
}

Expand Down Expand Up @@ -829,7 +830,7 @@ func TestPreviouslyInterestingTables(t *testing.T) {
runnerA.Exec(t, fmt.Sprintf("DELETE FROM %s LIMIT 5", tableName))
WaitUntilReplicatedTime(t, s.Clock().Now(), runnerB, jobBID)
}

require.NoError(t, replicationtestutils.CheckEmptyDLQs(ctx, runnerB.DB, "b"))
compareReplicatedTables(t, s, "a", "b", tableName, runnerA, runnerB)
})
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/crosscluster/logical/udf_row_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationtestutils"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/sql/randgen"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -103,7 +104,7 @@ func TestUDFWithRandomTables(t *testing.T) {
WaitUntilReplicatedTime(t, s.Clock().Now(), runnerB, jobBID)
runnerA.Exec(t, fmt.Sprintf("DELETE FROM %s LIMIT 5", tableName))
WaitUntilReplicatedTime(t, s.Clock().Now(), runnerB, jobBID)

require.NoError(t, replicationtestutils.CheckEmptyDLQs(ctx, runnerB.DB, "b"))
compareReplicatedTables(t, s, "a", "b", tableName, runnerA, runnerB)
}

Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/crosscluster/replicationtestutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go_library(
testonly = 1,
srcs = [
"encoding.go",
"logical.go",
"replication_helpers.go",
"span_config_helpers.go",
"testutils.go",
Expand Down
44 changes: 44 additions & 0 deletions pkg/ccl/crosscluster/replicationtestutils/logical.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2024 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package replicationtestutils

import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/errors"
)

func CheckEmptyDLQs(ctx context.Context, db sqlutils.DBHandle, dbName string) error {
dlqNameQuery := fmt.Sprintf("SELECT table_name FROM [SHOW TABLES FROM %s] where schema_name = 'crdb_replication'", dbName)
rows, err := db.QueryContext(ctx, dlqNameQuery)
if err != nil {
return errors.Wrapf(err, "failed to query dlq table name for database %s", dbName)
}
defer rows.Close()

var dlqTableName string
var dlqRowCount int
for rows.Next() {
if err := rows.Scan(&dlqTableName); err != nil {
return errors.Wrapf(err, "failed to scan dlq table name for database %s", dbName)
}
if err := db.QueryRowContext(ctx, fmt.Sprintf("SELECT count(*) FROM %s.crdb_replication.%s", dbName, dlqTableName)).Scan(&dlqRowCount); err != nil {
return err
}
if dlqRowCount != 0 {
return fmt.Errorf("expected DLQ to be empty, but found %d rows", dlqRowCount)
}
}
if dlqTableName == "" {
return errors.Newf("didn't find any any dlq tables in database %s", dbName)
}
return nil
}
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ go_library(
"//pkg/ccl/changefeedccl",
"//pkg/ccl/changefeedccl/cdctest",
"//pkg/ccl/changefeedccl/changefeedbase",
"//pkg/ccl/crosscluster/replicationtestutils",
"//pkg/ccl/crosscluster/replicationutils",
"//pkg/ccl/storageccl/engineccl/enginepbccl",
"//pkg/cli",
Expand Down
51 changes: 33 additions & 18 deletions pkg/cmd/roachtest/tests/logical_data_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"sync"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationtestutils"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
Expand Down Expand Up @@ -175,17 +176,22 @@ func registerLogicalDataReplicationTests(r registry.Registry) {

func TestLDRBasic(ctx context.Context, t test.Test, c cluster.Cluster, setup multiClusterSetup) {
duration := 15 * time.Minute
initRows := 1000
maxBlockBytes := 1024

if c.IsLocal() {
duration = 5 * time.Minute
duration = 30 * time.Second
initRows = 10
maxBlockBytes = 32
}

ldrWorkload := LDRWorkload{
workload: replicateKV{
readPercent: 0,
debugRunDuration: duration,
maxBlockBytes: 1024,
initRows: 1000,
initWithSplitAndScatter: true},
maxBlockBytes: maxBlockBytes,
initRows: initRows,
initWithSplitAndScatter: !c.IsLocal()},
dbName: "kv",
tableName: "kv",
}
Expand Down Expand Up @@ -231,7 +237,7 @@ func TestLDRBasic(ctx context.Context, t test.Test, c cluster.Cluster, setup mul
llv.assertValid(t)
rlv.assertValid(t)

VerifyCorrectness(t, setup, leftJobID, rightJobID, 2*time.Minute, ldrWorkload)
VerifyCorrectness(ctx, c, t, setup, leftJobID, rightJobID, 2*time.Minute, ldrWorkload)
}

func TestLDRSchemaChange(
Expand Down Expand Up @@ -307,7 +313,7 @@ func TestLDRSchemaChange(
llv.assertValid(t)
rlv.assertValid(t)

VerifyCorrectness(t, setup, leftJobID, rightJobID, 2*time.Minute, ldrWorkload)
VerifyCorrectness(ctx, c, t, setup, leftJobID, rightJobID, 2*time.Minute, ldrWorkload)
}

func TestLDRUpdateHeavy(
Expand Down Expand Up @@ -371,7 +377,7 @@ func TestLDRUpdateHeavy(
llv.assertValid(t)
rlv.assertValid(t)

VerifyCorrectness(t, setup, leftJobID, rightJobID, 2*time.Minute, ldrWorkload)
VerifyCorrectness(ctx, c, t, setup, leftJobID, rightJobID, 2*time.Minute, ldrWorkload)
}

func TestLDROnNodeShutdown(
Expand Down Expand Up @@ -463,7 +469,7 @@ func TestLDROnNodeShutdown(
}

monitor.Wait()
VerifyCorrectness(t, setup, leftJobID, rightJobID, 5*time.Minute, ldrWorkload)
VerifyCorrectness(ctx, c, t, setup, leftJobID, rightJobID, 5*time.Minute, ldrWorkload)
}

// TestLDROnNetworkPartition aims to see what happens when both clusters
Expand Down Expand Up @@ -522,7 +528,7 @@ func TestLDROnNetworkPartition(
t.L().Printf("Nodes reconnected. Waiting for workload to complete")

monitor.Wait()
VerifyCorrectness(t, setup, leftJobID, rightJobID, 5*time.Minute, ldrWorkload)
VerifyCorrectness(ctx, c, t, setup, leftJobID, rightJobID, 5*time.Minute, ldrWorkload)
}

type ldrJobInfo struct {
Expand Down Expand Up @@ -709,6 +715,8 @@ func setupLDR(
}

func VerifyCorrectness(
ctx context.Context,
c cluster.Cluster,
t test.Test,
setup multiClusterSetup,
leftJobID, rightJobID int,
Expand All @@ -720,15 +728,22 @@ func VerifyCorrectness(

waitForReplicatedTimeToReachTimestamp(t, leftJobID, setup.left.db, getLogicalDataReplicationJobInfo, waitTime, now)
waitForReplicatedTimeToReachTimestamp(t, rightJobID, setup.right.db, getLogicalDataReplicationJobInfo, waitTime, now)

// TODO(ssd): Decide how we want to fingerprint
// this table while we are using in-row storage
// for crdb_internal_mvcc_timestamp.
var leftCount, rightCount int
selectQuery := fmt.Sprintf("SELECT count(1) FROM %s.%s", ldrWorkload.dbName, ldrWorkload.tableName)
setup.left.sysSQL.QueryRow(t, selectQuery).Scan(&leftCount)
setup.right.sysSQL.QueryRow(t, selectQuery).Scan(&rightCount)
require.Equal(t, leftCount, rightCount)
require.NoError(t, replicationtestutils.CheckEmptyDLQs(ctx, setup.left.db, ldrWorkload.dbName))
require.NoError(t, replicationtestutils.CheckEmptyDLQs(ctx, setup.right.db, ldrWorkload.dbName))

m := c.NewMonitor(context.Background(), setup.CRDBNodes())
var leftFingerprint, rightFingerprint [][]string
queryStmt := fmt.Sprintf("SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE %s.%s WITH EXCLUDE COLUMNS = ('crdb_replication_origin_timestamp')", ldrWorkload.dbName, ldrWorkload.tableName)
m.Go(func(ctx context.Context) error {
leftFingerprint = setup.left.sysSQL.QueryStr(t, queryStmt)
return nil
})
m.Go(func(ctx context.Context) error {
rightFingerprint = setup.right.sysSQL.QueryStr(t, queryStmt)
return nil
})
m.Wait()
require.Equal(t, leftFingerprint, rightFingerprint)
}

func getDebugZips(ctx context.Context, t test.Test, c cluster.Cluster, setup multiClusterSetup) {
Expand Down

0 comments on commit caa0ada

Please sign in to comment.