From 904f0ed3d2aff6b4eec3dce810af2731ff0e7860 Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Thu, 10 Oct 2024 15:08:03 -0400 Subject: [PATCH] roachtest: run perf tests with sql and kv mode Epic: none Release note: none --- .../tests/logical_data_replication.go | 95 +++++++++++++++---- 1 file changed, 77 insertions(+), 18 deletions(-) diff --git a/pkg/cmd/roachtest/tests/logical_data_replication.go b/pkg/cmd/roachtest/tests/logical_data_replication.go index 68dac634cf65..053fa4d76c68 100644 --- a/pkg/cmd/roachtest/tests/logical_data_replication.go +++ b/pkg/cmd/roachtest/tests/logical_data_replication.go @@ -76,7 +76,7 @@ type LDRWorkload struct { func registerLogicalDataReplicationTests(r registry.Registry) { specs := []ldrTestSpec{ { - name: "ldr/kv0/workload=both/ingestion", + name: "ldr/kv0/workload=both/basic/immediate", clusterSpec: multiClusterSpec{ leftNodes: 3, rightNodes: 3, @@ -87,10 +87,11 @@ func registerLogicalDataReplicationTests(r registry.Registry) { spec.VolumeSize(100), }, }, - run: TestLDRBasic, + mode: ModeImmediate, + run: TestLDRBasic, }, { - name: "ldr/kv0/workload=both/update_heavy", + name: "ldr/kv0/workload=both/basic/validated", clusterSpec: multiClusterSpec{ leftNodes: 3, rightNodes: 3, @@ -101,7 +102,38 @@ func registerLogicalDataReplicationTests(r registry.Registry) { spec.VolumeSize(100), }, }, - run: TestLDRUpdateHeavy, + mode: ModeValidated, + run: TestLDRBasic, + }, + { + name: "ldr/kv0/workload=both/update_heavy/immediate", + clusterSpec: multiClusterSpec{ + leftNodes: 3, + rightNodes: 3, + clusterOpts: []spec.Option{ + spec.CPU(8), + spec.WorkloadNode(), + spec.WorkloadNodeCPU(8), + spec.VolumeSize(100), + }, + }, + mode: ModeImmediate, + run: TestLDRUpdateHeavy, + }, + { + name: "ldr/kv0/workload=both/update_heavy/validated", + clusterSpec: multiClusterSpec{ + leftNodes: 3, + rightNodes: 3, + clusterOpts: []spec.Option{ + spec.CPU(8), + spec.WorkloadNode(), + spec.WorkloadNodeCPU(8), + spec.VolumeSize(100), + }, + }, + mode: ModeValidated, + run: TestLDRUpdateHeavy, }, { name: "ldr/kv0/workload=both/shutdown_node", @@ -148,6 +180,7 @@ func registerLogicalDataReplicationTests(r registry.Registry) { } for _, sp := range specs { + r.Add(registry.TestSpec{ Name: sp.name, Owner: registry.OwnerDisasterRecovery, @@ -167,14 +200,15 @@ func registerLogicalDataReplicationTests(r registry.Registry) { } setup, cleanup := mc.Start(ctx, t) defer cleanup() - sp.run(ctx, t, c, setup) - + sp.run(ctx, t, c, setup, sp.mode) }, }) } } -func TestLDRBasic(ctx context.Context, t test.Test, c cluster.Cluster, setup multiClusterSetup) { +func TestLDRBasic( + ctx context.Context, t test.Test, c cluster.Cluster, setup multiClusterSetup, mode mode, +) { duration := 15 * time.Minute initRows := 1000 maxBlockBytes := 1024 @@ -196,7 +230,7 @@ func TestLDRBasic(ctx context.Context, t test.Test, c cluster.Cluster, setup mul tableName: "kv", } - leftJobID, rightJobID := setupLDR(ctx, t, c, setup, ldrWorkload) + leftJobID, rightJobID := setupLDR(ctx, t, c, setup, ldrWorkload, mode) // Setup latency verifiers maxExpectedLatency := 2 * time.Minute @@ -241,7 +275,7 @@ func TestLDRBasic(ctx context.Context, t test.Test, c cluster.Cluster, setup mul } func TestLDRSchemaChange( - ctx context.Context, t test.Test, c cluster.Cluster, setup multiClusterSetup, + ctx context.Context, t test.Test, c cluster.Cluster, setup multiClusterSetup, mode mode, ) { duration := 15 * time.Minute if c.IsLocal() { @@ -259,7 +293,7 @@ func TestLDRSchemaChange( tableName: "kv", } - leftJobID, rightJobID := setupLDR(ctx, t, c, setup, ldrWorkload) + leftJobID, rightJobID := setupLDR(ctx, t, c, setup, ldrWorkload, mode) // Setup latency verifiers maxExpectedLatency := 2 * time.Minute @@ -317,7 +351,7 @@ func TestLDRSchemaChange( } func TestLDRUpdateHeavy( - ctx context.Context, t test.Test, c cluster.Cluster, setup multiClusterSetup, + ctx context.Context, t test.Test, c cluster.Cluster, setup multiClusterSetup, mode mode, ) { duration := 10 * time.Minute @@ -336,7 +370,7 @@ func TestLDRUpdateHeavy( tableName: "usertable", } - leftJobID, rightJobID := setupLDR(ctx, t, c, setup, ldrWorkload) + leftJobID, rightJobID := setupLDR(ctx, t, c, setup, ldrWorkload, mode) // Setup latency verifiers maxExpectedLatency := 3 * time.Minute @@ -381,7 +415,7 @@ func TestLDRUpdateHeavy( } func TestLDROnNodeShutdown( - ctx context.Context, t test.Test, c cluster.Cluster, setup multiClusterSetup, + ctx context.Context, t test.Test, c cluster.Cluster, setup multiClusterSetup, mode mode, ) { duration := 10 * time.Minute @@ -400,7 +434,7 @@ func TestLDROnNodeShutdown( tableName: "kv", } - leftJobID, rightJobID := setupLDR(ctx, t, c, setup, ldrWorkload) + leftJobID, rightJobID := setupLDR(ctx, t, c, setup, ldrWorkload, mode) // Setup latency verifiers, remembering to account for latency spike from killing a node maxExpectedLatency := 5 * time.Minute @@ -477,7 +511,7 @@ func TestLDROnNodeShutdown( // aim to keep the workload going on both sides and wait for reconciliation // once the network partition has completed func TestLDROnNetworkPartition( - ctx context.Context, t test.Test, c cluster.Cluster, setup multiClusterSetup, + ctx context.Context, t test.Test, c cluster.Cluster, setup multiClusterSetup, mode mode, ) { duration := 10 * time.Minute if c.IsLocal() { @@ -495,7 +529,7 @@ func TestLDROnNetworkPartition( tableName: "kv", } - leftJobID, rightJobID := setupLDR(ctx, t, c, setup, ldrWorkload) + leftJobID, rightJobID := setupLDR(ctx, t, c, setup, ldrWorkload, mode) monitor := c.NewMonitor(ctx, setup.CRDBNodes()) monitor.Go(func(ctx context.Context) error { @@ -557,9 +591,29 @@ func getLogicalDataReplicationJobInfo(db *gosql.DB, jobID int) (jobInfo, error) type ldrTestSpec struct { name string clusterSpec multiClusterSpec - run func(context.Context, test.Test, cluster.Cluster, multiClusterSetup) + run func(context.Context, test.Test, cluster.Cluster, multiClusterSetup, mode) + mode mode +} + +type mode int + +func (m mode) String() string { + switch m { + case ModeImmediate: + return "immediate" + case ModeValidated: + return "validated" + default: + return "default" + } } +const ( + Default = iota + ModeImmediate + ModeValidated +) + type multiClusterSpec struct { clusterOpts []spec.Option @@ -674,6 +728,7 @@ func setupLDR( c cluster.Cluster, setup multiClusterSetup, ldrWorkload LDRWorkload, + mode mode, ) (int, int) { c.Run(ctx, option.WithNodes(setup.workloadNode), @@ -690,9 +745,13 @@ func setupLDR( setup.right.sysSQL.Exec(t, timestampQuery) startLDR := func(targetDB *sqlutils.SQLRunner, sourceURL string) int { + options := "" + if mode.String() != "" { + options = fmt.Sprintf("WITH mode='%s'", mode) + } targetDB.Exec(t, fmt.Sprintf("USE %s", dbName)) r := targetDB.QueryRow(t, - fmt.Sprintf("CREATE LOGICAL REPLICATION STREAM FROM TABLE %s ON $1 INTO TABLE %s", tableName, tableName), + fmt.Sprintf("CREATE LOGICAL REPLICATION STREAM FROM TABLE %s ON $1 INTO TABLE %s %s", tableName, tableName, options), sourceURL, ) var jobID int