diff --git a/pkg/cmd/roachtest/cdc.go b/pkg/cmd/roachtest/cdc.go index 98e027cbeafd..5988e327d8b6 100644 --- a/pkg/cmd/roachtest/cdc.go +++ b/pkg/cmd/roachtest/cdc.go @@ -48,6 +48,7 @@ type cdcTestArgs struct { tpccWarehouseCount int workloadDuration string initialScan bool + rangefeed bool kafkaChaos bool crdbChaos bool @@ -68,6 +69,11 @@ func cdcBasicTest(ctx context.Context, t *test, c *cluster, args cdcTestArgs) { db := c.Conn(ctx, 1) defer stopFeeds(db) + if _, err := db.Exec( + `SET CLUSTER SETTING kv.rangefeed.enabled = $1`, args.rangefeed, + ); err != nil { + t.Fatal(err) + } t.Status("installing kafka") kafka := kafkaManager{ @@ -343,6 +349,24 @@ func registerCDC(r *registry) { }) }, }) + r.Add(testSpec{ + Name: "cdc/rangefeed-unstable", + Skip: `resolved timestamps are not yet reliable with RangeFeed`, + MinVersion: "2.2.0", + Nodes: nodes(4, cpu(16)), + Run: func(ctx context.Context, t *test, c *cluster) { + cdcBasicTest(ctx, t, c, cdcTestArgs{ + workloadType: tpccWorkloadType, + tpccWarehouseCount: 100, + workloadDuration: "30m", + initialScan: false, + rangefeed: true, + kafkaChaos: false, + targetInitialScanLatency: 30 * time.Minute, + targetSteadyLatency: time.Minute, + }) + }, + }) r.Add(testSpec{ Name: "cdc/sink-chaos", MinVersion: "2.1.0",