Skip to content

Commit

Permalink
roachtest: add mixed version testing for CDC.
Browse files Browse the repository at this point in the history
This commits adds the `cdc/mixed-versions` roachtest. It builds on top
of the existing infrastructure for mixed version testing in order to
create a test scenario that reproduces the issue observed by DoorDash
when upgrading from 21.2 to 22.1.0.

Following the pattern present in other mixed version tests, this test
starts the cluster at the previous version, upgrades the binaries to
the current version, rolls it back, and then finally performs the
binary upgrade again and allowing the upgrade to finalize.

This roachtest uses the `FingerprintValidator` infrastructure used in
CDC unit tests (and also in the currently skipped `cdc/bank`
roachtest). This validator gives us strong guarantees that the events
produced by a changefeed are correct even in the face of ongoing
upgrades.

This test fails roachtest on v22.1.0; once the upgrade to v22.1 is
finalized, the Kafka consumer won't see further updates, and the test
eventually times out.

Release note: None.

roachtest: change target table for mixed-version CDC test.

Release note: None.
  • Loading branch information
renatolabs committed Aug 10, 2022
1 parent a40b562 commit b17d38d
Show file tree
Hide file tree
Showing 7 changed files with 392 additions and 22 deletions.
29 changes: 27 additions & 2 deletions pkg/ccl/changefeedccl/cdctest/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,10 +475,16 @@ func (v *fingerprintValidator) applyRowUpdate(row validatorRow) (_err error) {
return err
}

if string(primaryKeyJSON) != row.key {
rowKey := row.key
if len(primaryKeyDatums) > 1 {
// format the key using the Go marshaller; otherwise, differences
// in formatting could lead to the comparison below failing
rowKey = asGoJSON(row.key)
}
if string(primaryKeyJSON) != rowKey {
v.failures = append(v.failures,
fmt.Sprintf(`key %s did not match expected key %s for value %s`,
row.key, primaryKeyJSON, row.value))
rowKey, primaryKeyJSON, row.value))
}
} else {
// DELETE
Expand Down Expand Up @@ -734,3 +740,22 @@ func fetchPrimaryKeyCols(sqlDB *gosql.DB, tableStr string) ([]string, error) {
}
return primaryKeyCols, nil
}

// asGoJSON tries to unmarshal the given string as JSON; if
// successful, the struct is marshalled back to JSON. This is to
// enforce the default formatting of the standard library marshaller,
// allowing comparisons of JSON strings when we don't control the
// formatting of the strings.
func asGoJSON(s string) string {
var obj interface{}
if err := gojson.Unmarshal([]byte(s), &obj); err != nil {
return s
}

blob, err := gojson.Marshal(obj)
if err != nil {
return s
}

return string(blob)
}
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ go_library(
"liquibase_blocklist.go",
"loss_of_quorum_recovery.go",
"many_splits.go",
"mixed_version_cdc.go",
"mixed_version_decommission.go",
"mixed_version_jobs.go",
"mixed_version_schemachange.go",
Expand Down
44 changes: 28 additions & 16 deletions pkg/cmd/roachtest/tests/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,6 @@ func cdcBasicTest(ctx context.Context, t test.Test, c cluster.Cluster, args cdcT
}

func runCDCBank(ctx context.Context, t test.Test, c cluster.Cluster) {

// Make the logs dir on every node to work around the `roachprod get logs`
// spam.
c.Run(ctx, c.All(), `mkdir -p logs`)
Expand All @@ -305,21 +304,9 @@ func runCDCBank(ctx context.Context, t test.Test, c cluster.Cluster) {
c.Put(ctx, t.Cockroach(), "./cockroach", crdbNodes)
c.Put(ctx, t.DeprecatedWorkload(), "./workload", workloadNode)
c.Start(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings(), crdbNodes)
kafka := kafkaManager{
t: t,
c: c,
nodes: kafkaNode,
}
kafka.install(ctx)
if !c.IsLocal() {
// TODO(dan): This test currently connects to kafka from the test
// runner, so kafka needs to advertise the external address. Better
// would be a binary we could run on one of the roachprod machines.
c.Run(ctx, kafka.nodes, `echo "advertised.listeners=PLAINTEXT://`+kafka.consumerURL(ctx)+`" >> `+
filepath.Join(kafka.configDir(), "server.properties"))
}
kafka.start(ctx, "kafka")
defer kafka.stop(ctx)

kafka, cleanup := setupKafka(ctx, t, c, kafkaNode)
defer cleanup()

t.Status("creating kafka topic")
if err := kafka.createTopic(ctx, "bank"); err != nil {
Expand Down Expand Up @@ -1834,6 +1821,31 @@ func stopFeeds(db *gosql.DB) {
)`)
}

// setupKafka installs Kafka on the cluster and configures it so that
// the test runner can connect to it. Returns a function to be called
// at the end of the test for stopping Kafka.
func setupKafka(
ctx context.Context, t test.Test, c cluster.Cluster, nodes option.NodeListOption,
) (kafkaManager, func()) {
kafka := kafkaManager{
t: t,
c: c,
nodes: nodes,
}

kafka.install(ctx)
if !c.IsLocal() {
// TODO(dan): This test currently connects to kafka from the test
// runner, so kafka needs to advertise the external address. Better
// would be a binary we could run on one of the roachprod machines.
c.Run(ctx, kafka.nodes, `echo "advertised.listeners=PLAINTEXT://`+kafka.consumerURL(ctx)+`" >> `+
filepath.Join(kafka.configDir(), "server.properties"))
}

kafka.start(ctx, "kafka")
return kafka, func() { kafka.stop(ctx) }
}

type topicConsumer struct {
sarama.Consumer

Expand Down
Loading

0 comments on commit b17d38d

Please sign in to comment.