diff --git a/pkg/cmd/roachtest/cdc.go b/pkg/cmd/roachtest/cdc.go index 7c67c87b76c1..8904b012de24 100644 --- a/pkg/cmd/roachtest/cdc.go +++ b/pkg/cmd/roachtest/cdc.go @@ -275,9 +275,16 @@ func runCDCBank(ctx context.Context, t *test, c *cluster) { ); err != nil { t.Fatal(err) } + + // NB: the WITH diff option was not supported until v20.1. + withDiff := t.IsBuildVersion("v20.1.0") + var opts = []string{`updated`, `resolved`} + if withDiff { + opts = append(opts, `diff`) + } var jobID string if err := db.QueryRow( - `CREATE CHANGEFEED FOR bank.bank INTO $1 WITH updated, resolved, diff`, kafka.sinkURL(ctx), + `CREATE CHANGEFEED FOR bank.bank INTO $1 WITH `+strings.Join(opts, `, `), kafka.sinkURL(ctx), ).Scan(&jobID); err != nil { t.Fatal(err) } @@ -314,19 +321,22 @@ func runCDCBank(ctx context.Context, t *test, c *cluster) { } const requestedResolved = 100 - baV, err := cdctest.NewBeforeAfterValidator(db, `bank.bank`) - if err != nil { - return err - } fprintV, err := cdctest.NewFingerprintValidator(db, `bank.bank`, `fprint`, tc.partitions, 0) if err != nil { return err } - v := cdctest.MakeCountValidator(cdctest.Validators{ + validators := cdctest.Validators{ cdctest.NewOrderValidator(`bank`), - baV, fprintV, - }) + } + if withDiff { + baV, err := cdctest.NewBeforeAfterValidator(db, `bank.bank`) + if err != nil { + return err + } + validators = append(validators, baV) + } + v := cdctest.MakeCountValidator(validators) for { m := tc.Next(ctx) @@ -387,10 +397,16 @@ func runCDCSchemaRegistry(ctx context.Context, t *test, c *cluster) { if _, err := db.Exec(`CREATE TABLE foo (a INT PRIMARY KEY)`); err != nil { t.Fatal(err) } + + // NB: the WITH diff option was not supported until v20.1. + withDiff := t.IsBuildVersion("v20.1.0") + var opts = []string{`updated`, `resolved`, `format=experimental_avro`, `confluent_schema_registry=$2`} + if withDiff { + opts = append(opts, `diff`) + } var jobID string if err := db.QueryRow( - `CREATE CHANGEFEED FOR foo INTO $1`+ - `WITH updated, resolved, diff, format=experimental_avro, confluent_schema_registry=$2`, + `CREATE CHANGEFEED FOR foo INTO $1 WITH `+strings.Join(opts, `, `), kafka.sinkURL(ctx), kafka.schemaRegistryURL(ctx), ).Scan(&jobID); err != nil { t.Fatal(err) @@ -448,14 +464,30 @@ func runCDCSchemaRegistry(ctx context.Context, t *test, c *cluster) { } sort.Strings(updated) - expected := []string{ - `{"before":null,"after":{"foo":{"a":{"long":1}}},"updated":{"string":""}}`, - `{"before":null,"after":{"foo":{"a":{"long":2},"b":{"string":"2"}}},"updated":{"string":""}}`, - `{"before":null,"after":{"foo":{"a":{"long":3},"b":{"string":"3"},"c":{"long":3}}},"updated":{"string":""}}`, - `{"before":null,"after":{"foo":{"a":{"long":4},"c":{"long":4}}},"updated":{"string":""}}`, - `{"before":{"foo_before":{"a":{"long":1},"c":null}},"after":{"foo":{"a":{"long":1},"c":null}},"updated":{"string":""}}`, - `{"before":{"foo_before":{"a":{"long":2},"c":null}},"after":{"foo":{"a":{"long":2},"c":null}},"updated":{"string":""}}`, - `{"before":{"foo_before":{"a":{"long":3},"c":{"long":3}}},"after":{"foo":{"a":{"long":3},"c":{"long":3}}},"updated":{"string":""}}`, + var expected []string + if withDiff { + expected = []string{ + `{"before":null,"after":{"foo":{"a":{"long":1}}},"updated":{"string":""}}`, + `{"before":null,"after":{"foo":{"a":{"long":2},"b":{"string":"2"}}},"updated":{"string":""}}`, + `{"before":null,"after":{"foo":{"a":{"long":3},"b":{"string":"3"},"c":{"long":3}}},"updated":{"string":""}}`, + `{"before":null,"after":{"foo":{"a":{"long":4},"c":{"long":4}}},"updated":{"string":""}}`, + `{"before":{"foo_before":{"a":{"long":1},"b":null,"c":null}},"after":{"foo":{"a":{"long":1},"c":null}},"updated":{"string":""}}`, + `{"before":{"foo_before":{"a":{"long":1},"c":null}},"after":{"foo":{"a":{"long":1},"c":null}},"updated":{"string":""}}`, + `{"before":{"foo_before":{"a":{"long":2},"b":{"string":"2"},"c":null}},"after":{"foo":{"a":{"long":2},"c":null}},"updated":{"string":""}}`, + `{"before":{"foo_before":{"a":{"long":2},"c":null}},"after":{"foo":{"a":{"long":2},"c":null}},"updated":{"string":""}}`, + `{"before":{"foo_before":{"a":{"long":3},"b":{"string":"3"},"c":{"long":3}}},"after":{"foo":{"a":{"long":3},"c":{"long":3}}},"updated":{"string":""}}`, + `{"before":{"foo_before":{"a":{"long":3},"c":{"long":3}}},"after":{"foo":{"a":{"long":3},"c":{"long":3}}},"updated":{"string":""}}`, + } + } else { + expected = []string{ + `{"updated":{"string":""},"after":{"foo":{"a":{"long":1},"c":null}}}`, + `{"updated":{"string":""},"after":{"foo":{"a":{"long":1}}}}`, + `{"updated":{"string":""},"after":{"foo":{"a":{"long":2},"b":{"string":"2"}}}}`, + `{"updated":{"string":""},"after":{"foo":{"a":{"long":2},"c":null}}}`, + `{"updated":{"string":""},"after":{"foo":{"a":{"long":3},"b":{"string":"3"},"c":{"long":3}}}}`, + `{"updated":{"string":""},"after":{"foo":{"a":{"long":3},"c":{"long":3}}}}`, + `{"updated":{"string":""},"after":{"foo":{"a":{"long":4},"c":{"long":4}}}}`, + } } if strings.Join(expected, "\n") != strings.Join(updated, "\n") { t.Fatalf("expected\n%s\n\ngot\n%s\n\n",