Skip to content

Commit

Permalink
roachtest: add mixed version testing for CDC.
Browse files Browse the repository at this point in the history
This commits adds the `cdc/mixed-versions` roachtest. It builds on top
of the existing infrastructure for mixed version testing in order to
create a test scenario that reproduces the issue observed by DoorDash
when upgrading from 21.2 to 22.1.0.

Following the pattern present in other mixed version tests, this test
starts the cluster at the previous version, upgrades the binaries to
the current version, rolls it back, and then finally performs the
binary upgrade again and allowing the upgrade to finalize.

This roachtest uses the `FingerprintValidator` infrastructure used in
CDC unit tests (and also in the currently skipped `cdc/bank`
roachtest). This validator gives us strong guarantees that the events
produced by a changefeed are correct even in the face of ongoing
upgrades.

This test fails roachtest on v22.1.0; once the upgrade to v22.1 is
finalized, the Kafka consumer won't see further updates, and the test
eventually times out.

Release note: None.
  • Loading branch information
renatolabs committed Jul 29, 2022
1 parent 3ed3138 commit b7e4ca0
Show file tree
Hide file tree
Showing 6 changed files with 357 additions and 20 deletions.
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ go_library(
"liquibase_blocklist.go",
"loss_of_quorum_recovery.go",
"many_splits.go",
"mixed_version_cdc.go",
"mixed_version_decommission.go",
"mixed_version_jobs.go",
"mixed_version_schemachange.go",
Expand Down
44 changes: 28 additions & 16 deletions pkg/cmd/roachtest/tests/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,6 @@ func cdcBasicTest(ctx context.Context, t test.Test, c cluster.Cluster, args cdcT
}

func runCDCBank(ctx context.Context, t test.Test, c cluster.Cluster) {

// Make the logs dir on every node to work around the `roachprod get logs`
// spam.
c.Run(ctx, c.All(), `mkdir -p logs`)
Expand All @@ -305,21 +304,9 @@ func runCDCBank(ctx context.Context, t test.Test, c cluster.Cluster) {
c.Put(ctx, t.Cockroach(), "./cockroach", crdbNodes)
c.Put(ctx, t.DeprecatedWorkload(), "./workload", workloadNode)
c.Start(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings(), crdbNodes)
kafka := kafkaManager{
t: t,
c: c,
nodes: kafkaNode,
}
kafka.install(ctx)
if !c.IsLocal() {
// TODO(dan): This test currently connects to kafka from the test
// runner, so kafka needs to advertise the external address. Better
// would be a binary we could run on one of the roachprod machines.
c.Run(ctx, kafka.nodes, `echo "advertised.listeners=PLAINTEXT://`+kafka.consumerURL(ctx)+`" >> `+
filepath.Join(kafka.configDir(), "server.properties"))
}
kafka.start(ctx, "kafka")
defer kafka.stop(ctx)

kafka, cleanup := setupKafka(ctx, t, c, kafkaNode)
defer cleanup()

t.Status("creating kafka topic")
if err := kafka.createTopic(ctx, "bank"); err != nil {
Expand Down Expand Up @@ -1837,6 +1824,31 @@ func stopFeeds(db *gosql.DB) {
)`)
}

// setupKafka installs Kafka on the cluster and configures it so that
// the test runner can connect to it. Returns a function to be called
// at the end of the test for stopping Kafka.
func setupKafka(
ctx context.Context, t test.Test, c cluster.Cluster, nodes option.NodeListOption,
) (kafkaManager, func()) {
kafka := kafkaManager{
t: t,
c: c,
nodes: nodes,
}

kafka.install(ctx)
if !c.IsLocal() {
// TODO(dan): This test currently connects to kafka from the test
// runner, so kafka needs to advertise the external address. Better
// would be a binary we could run on one of the roachprod machines.
c.Run(ctx, kafka.nodes, `echo "advertised.listeners=PLAINTEXT://`+kafka.consumerURL(ctx)+`" >> `+
filepath.Join(kafka.configDir(), "server.properties"))
}

kafka.start(ctx, "kafka")
return kafka, func() { kafka.stop(ctx) }
}

type topicConsumer struct {
sarama.Consumer

Expand Down
318 changes: 318 additions & 0 deletions pkg/cmd/roachtest/tests/mixed_version_cdc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,318 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package tests

import (
"context"
"fmt"
"strconv"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/version"
)

const (
// how many resolved timestamps to wait for before considering the
// system to be working as intended.
requestedResolved = 20
)

var (
// the CDC target (table). We're running the tpcc workload in this
// test; verifying that the changefeed on the `warehouse` table
// works is sufficient for our purposes
target = "tpcc.warehouse"
)

func registerCDCMixedVersions(r registry.Registry) {
r.Add(registry.TestSpec{
Name: "cdc/mixed-versions",
Owner: registry.OwnerTestEng,
Cluster: r.MakeClusterSpec(5),
Timeout: 30 * time.Minute,
RequiresLicense: true,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runCDCMixedVersions(ctx, t, c, *t.BuildVersion())
},
})
}

// cdcMixedVersionTester implements mixed-version/upgrade testing for
// CDC. It knows how to set up the cluster to run Kafka, monitor
// events, and run validations that ensure changefeeds work during and
// after upgrade.
type cdcMixedVersionTester struct {
ctx context.Context
crdbNodes option.NodeListOption
workloadNodes option.NodeListOption
kafkaNodes option.NodeListOption
monitor cluster.Monitor
verifierDone chan struct{}
kafka kafkaManager
validator *cdctest.CountValidator
cleanup func()
}

func newCDCMixedVersionTester(
ctx context.Context, t test.Test, c cluster.Cluster,
) cdcMixedVersionTester {
crdbNodes := c.Range(1, c.Spec().NodeCount-1)
lastNode := c.Node(c.Spec().NodeCount)

c.Put(ctx, t.Cockroach(), "./cockroach", lastNode)
c.Put(ctx, t.DeprecatedWorkload(), "./workload", lastNode)

return cdcMixedVersionTester{
ctx: ctx,
crdbNodes: crdbNodes,
workloadNodes: lastNode,
kafkaNodes: lastNode,
monitor: c.NewMonitor(ctx, crdbNodes),
verifierDone: make(chan struct{}),
}
}

// StartKafka will install and start Kafka on the configured node. It
// will also create the topic where changefeed events will be sent.
func (cmvt *cdcMixedVersionTester) StartKafka(t test.Test, c cluster.Cluster) {
t.Status("starting Kafka node")
cmvt.kafka, cmvt.cleanup = setupKafka(cmvt.ctx, t, c, cmvt.kafkaNodes)
if err := cmvt.kafka.createTopic(cmvt.ctx, target); err != nil {
t.Fatal(err)
}
}

// Cleanup is supposed to be called at the end of tests that use
// `cdcMixedVersionTester`
func (cmvt *cdcMixedVersionTester) Cleanup() {
if cmvt.cleanup != nil {
cmvt.cleanup()
}
}

// installAndStartTPCCWorkload starts a TPCC workload for the given
// duration. This step does not block; if an error is found while
// runing the workflow, the test fails.
func (cmvt *cdcMixedVersionTester) installAndStartTPCCWorkload(d string) versionStep {
return func(ctx context.Context, t test.Test, u *versionUpgradeTest) {
t.Status("installing and running workload")
tpcc := tpccWorkload{
sqlNodes: cmvt.crdbNodes,
workloadNodes: cmvt.workloadNodes,
tpccWarehouseCount: 10,
tolerateErrors: true, // we're bringing nodes down while upgrading
}

tpcc.install(ctx, u.c)
time.Sleep(2 * time.Second)
cmvt.monitor.Go(func(ctx context.Context) error {
tpcc.run(ctx, u.c, d)
return nil
})
}
}

// waitForVerifier waits for the underlying CDC verifier to reach the
// desired number of resolved timestamps.
func (cmvt *cdcMixedVersionTester) waitForVerifier() versionStep {
return func(ctx context.Context, t test.Test, u *versionUpgradeTest) {
t.Status("waiting for verifier")
<-cmvt.verifierDone
}
}

// setupVerifier creates a CDC validator to validate that a changefeed
// created on the `tpcc.warehouse` table is able to re-create the
// table somewhere else. It also verifies CDC's ordering
// guarantees. This step will not block, but will start the verifier
// in a separate Go routine. Use `waitForVerifier` to wait for
// the verifier to finish.
func (cmvt *cdcMixedVersionTester) setupVerifier(node int) versionStep {
return func(ctx context.Context, t test.Test, u *versionUpgradeTest) {
t.Status(fmt.Sprintf("setting up changefeed verifier for table %s", target))

// we could just return the error here and let `Wait` return the
// error. However, calling t.Fatal directly lets us stop the test
// earlier
cmvt.monitor.Go(func(ctx context.Context) error {
consumer, err := cmvt.kafka.consumer(ctx, "warehouse")
if err != nil {
t.Fatal(err)
}
defer consumer.Close()

db := u.conn(ctx, t, node)
if _, err := db.Exec(
"CREATE TABLE fprint (" +
"w_id INT NOT NULL PRIMARY KEY, " +
"w_name VARCHAR(10) NOT NULL, " +
"w_street_1 VARCHAR(20) NOT NULL, " +
"w_street_2 VARCHAR(20) NOT NULL, " +
"w_city VARCHAR(20) NOT NULL, " +
"w_state VARCHAR(2) NOT NULL, " +
"w_zip VARCHAR(9) NOT NULL, " +
"w_tax DECIMAL(4, 4) NOT NULL, " +
"w_ytd DECIMAL(12, 2) NOT NULL)",
); err != nil {
t.Fatal(err)
}

fprintV, err := cdctest.NewFingerprintValidator(db, `tpcc.warehouse`, `fprint`, consumer.partitions, 0)
if err != nil {
t.Fatal(err)
}
validators := cdctest.Validators{
cdctest.NewOrderValidator("tpcc.warehouse"),
fprintV,
}
cmvt.validator = cdctest.MakeCountValidator(validators)

for {
m := consumer.Next(ctx)
if m == nil {
t.Fatal(fmt.Errorf("unexpected end of changefeed"))
return nil
}

updated, resolved, err := cdctest.ParseJSONValueTimestamps(m.Value)
if err != nil {
t.Fatal(err)
return nil
}

partitionStr := strconv.Itoa(int(m.Partition))
// errors in the calls to the validator below could lead to
// transient failures if the cluster is upgrading, so we retry
// for 1 minute.
if len(m.Key) > 0 {
if err := retry.ForDuration(1*time.Minute, func() error {
return cmvt.validator.NoteRow(partitionStr, string(m.Key), string(m.Value), updated)
}); err != nil {
t.Fatal(err)
return nil
}
} else {
if err := retry.ForDuration(1*time.Minute, func() error {
return cmvt.validator.NoteResolved(partitionStr, resolved)
}); err != nil {
t.Fatal(err)
return nil
}
t.L().Printf("%d of %d resolved timestamps validated, latest is %s behind realtime",
cmvt.validator.NumResolvedWithRows, requestedResolved, timeutil.Since(resolved.GoTime()))

if cmvt.validator.NumResolvedWithRows >= requestedResolved {
break
}
}
}

close(cmvt.verifierDone)
return nil
})
}
}

// validatorVerify checks if the validator has found any issues at the
// time the function is called.
func (cmvt *cdcMixedVersionTester) validatorVerify() versionStep {
return func(ctx context.Context, t test.Test, u *versionUpgradeTest) {
if failures := cmvt.validator.Failures(); len(failures) > 0 {
t.Fatalf("validator failures:\n%s", strings.Join(failures, "\n"))
}
}
}

// createChangeFeed issues a call to the given node to create a change
// feed for the warehouse table.
func (cmvt *cdcMixedVersionTester) createChangeFeed(node int) versionStep {
return func(ctx context.Context, t test.Test, u *versionUpgradeTest) {
t.Status("creating changefeed")
db := u.conn(ctx, t, node)
cdcClusterSettings(t, sqlutils.MakeSQLRunner(db))

opts := []string{"updated", "resolved"}
if _, err := db.Exec(
fmt.Sprintf("CREATE CHANGEFEED FOR %s INTO $1 WITH %s", target, strings.Join(opts, ", ")),
cmvt.kafka.sinkURL(ctx),
); err != nil {
t.Fatal(err)
}
}
}

func runCDCMixedVersions(
ctx context.Context, t test.Test, c cluster.Cluster, buildVersion version.Version,
) {
predecessorVersion, err := PredecessorVersion(buildVersion)
if err != nil {
t.Fatal(err)
}

tester := newCDCMixedVersionTester(ctx, t, c)
tester.StartKafka(t, c)
defer tester.Cleanup()

// sleep is a step added at different points in the test when we
// want the workload to have some time to run against a specific
// version of the database
sleep := sleepStep(10 * time.Second)

// An empty string will lead to the cockroach binary specified by flag
// `cockroach` to be used.
const mainVersion = ""
newVersionUpgradeTest(c,
uploadAndStartFromCheckpointFixture(tester.crdbNodes, predecessorVersion),
tester.setupVerifier(1),
tester.installAndStartTPCCWorkload("15m"),
waitForUpgradeStep(tester.crdbNodes),

// NB: at this point, cluster and binary version equal predecessorVersion,
// and auto-upgrades are on.
preventAutoUpgradeStep(1),
tester.createChangeFeed(1),

// let the workload run in the old version for a while
sleep,
// Roll the nodes into the new version one by one in random order
binaryUpgradeStep(tester.crdbNodes, mainVersion),
// let the workload run in the new version for a while
sleep,

tester.validatorVerify(),

// Roll back again, which ought to be fine because the cluster upgrade was
// not finalized.
binaryUpgradeStep(tester.crdbNodes, predecessorVersion),
sleep,

tester.validatorVerify(),

// Roll nodes forward and finalize upgrade.
binaryUpgradeStep(tester.crdbNodes, mainVersion),

// allow cluster version to update
allowAutoUpgradeStep(1),
waitForUpgradeStep(tester.crdbNodes),

tester.waitForVerifier(),
tester.validatorVerify(),
).run(ctx, t)
}
Loading

0 comments on commit b7e4ca0

Please sign in to comment.