Skip to content

Commit

Permalink
roachtest: add mixed-version testing for CDC
Browse files Browse the repository at this point in the history
This commits adds the `cdc/mixed-versions` roachtest. It builds on top
of the existing infrastructure for mixed version testing in order to
create a test scenario that reproduces the issue observed by DoorDash
when upgrading from 21.2 to 22.1.0.

Following the pattern present in other mixed version tests, this test
starts the cluster at the previous version, upgrades the binaries to
the current version, rolls it back, and then finally performs the
binary upgrade again and allowing the upgrade to finalize.

This roachtest uses the `FingerprintValidator` infrastructure used in
CDC unit tests (and also in the currently skipped `cdc/bank`
roachtest). This validator gives us strong guarantees that the events
produced by a changefeed are correct even in the face of ongoing
upgrades.

This test fails roachtest on v22.1.0; once the upgrade to v22.1 is
finalized, the Kafka consumer won't see further updates, and the test
eventually times out.

Release justification: test-only change.
Release note: None.
  • Loading branch information
renatolabs committed Aug 17, 2022
1 parent 5f45e42 commit 3525705
Show file tree
Hide file tree
Showing 10 changed files with 467 additions and 42 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/cdctest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ go_library(
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/randutil",
"//pkg/util/retry",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/cdctest/nemeses.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func RunNemesis(f TestFeedFactory, db *gosql.DB, isSinkless bool) (Validator, er
if err != nil {
return nil, err
}
fprintV, err := NewFingerprintValidator(db, `foo`, scratchTableName, foo.Partitions(), ns.maxTestColumnCount)
fprintV, err := NewFingerprintValidator(db, `foo`, scratchTableName, foo.Partitions(), ns.maxTestColumnCount, false)
if err != nil {
return nil, err
}
Expand Down
78 changes: 69 additions & 9 deletions pkg/ccl/changefeedccl/cdctest/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ import (
"fmt"
"sort"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -63,6 +65,8 @@ var _ Validator = &orderValidator{}
var _ Validator = &noOpValidator{}
var _ StreamValidator = &orderValidator{}

var retryDuration = 2 * time.Minute

type noOpValidator struct{}

// NoteRow accepts a changed row entry.
Expand Down Expand Up @@ -343,6 +347,11 @@ type fingerprintValidator struct {
fprintTestColumns int
buffer []validatorRow

// shouldRetry indicates whether row updates should be retried (for
// a fixed duration). Typically used when the transient errors are
// expected (e.g., if performing an upgrade)
shouldRetry bool

failures []string
}

Expand All @@ -353,7 +362,11 @@ type fingerprintValidator struct {
// will modify `fprint`'s schema to add `maxTestColumnCount` columns to avoid having to
// accommodate schema changes on the fly.
func NewFingerprintValidator(
sqlDB *gosql.DB, origTable, fprintTable string, partitions []string, maxTestColumnCount int,
sqlDB *gosql.DB,
origTable, fprintTable string,
partitions []string,
maxTestColumnCount int,
shouldRetry bool,
) (Validator, error) {
// Fetch the primary keys though information_schema schema inspections so we
// can use them to construct the SQL for DELETEs and also so we can verify
Expand Down Expand Up @@ -395,6 +408,7 @@ func NewFingerprintValidator(
primaryKeyCols: primaryKeyCols,
fprintOrigColumns: fprintOrigColumns,
fprintTestColumns: maxTestColumnCount,
shouldRetry: shouldRetry,
}
v.partitionResolved = make(map[string]hlc.Timestamp)
for _, partition := range partitions {
Expand Down Expand Up @@ -475,10 +489,16 @@ func (v *fingerprintValidator) applyRowUpdate(row validatorRow) (_err error) {
return err
}

if string(primaryKeyJSON) != row.key {
rowKey := row.key
if len(primaryKeyDatums) > 1 {
// format the key using the Go marshaller; otherwise, differences
// in formatting could lead to the comparison below failing
rowKey = asGoJSON(row.key)
}
if string(primaryKeyJSON) != rowKey {
v.failures = append(v.failures,
fmt.Sprintf(`key %s did not match expected key %s for value %s`,
row.key, primaryKeyJSON, row.value))
rowKey, primaryKeyJSON, row.value))
}
} else {
// DELETE
Expand All @@ -491,8 +511,11 @@ func (v *fingerprintValidator) applyRowUpdate(row validatorRow) (_err error) {
args = append(args, datum)
}
}
_, err := v.sqlDB.Exec(stmtBuf.String(), args...)
return err

return v.maybeRetry(func() error {
_, err := v.sqlDB.Exec(stmtBuf.String(), args...)
return err
})
}

// NoteResolved implements the Validator interface.
Expand Down Expand Up @@ -533,6 +556,9 @@ func (v *fingerprintValidator) NoteResolved(partition string, resolved hlc.Times
break
}
row := v.buffer[0]
// NOTE: changes to the validator's state before `applyRowUpdate`
// are safe because, if the operation can fail, the caller should
// be setting the `shouldRetry` field accordingly
v.buffer = v.buffer[1:]

// If we've processed all row updates belonging to the previous row's timestamp,
Expand Down Expand Up @@ -568,15 +594,19 @@ func (v *fingerprintValidator) NoteResolved(partition string, resolved hlc.Times

func (v *fingerprintValidator) fingerprint(ts hlc.Timestamp) error {
var orig string
if err := v.sqlDB.QueryRow(`SELECT IFNULL(fingerprint, 'EMPTY') FROM [
if err := v.maybeRetry(func() error {
return v.sqlDB.QueryRow(`SELECT IFNULL(fingerprint, 'EMPTY') FROM [
SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE ` + v.origTable + `
] AS OF SYSTEM TIME '` + ts.AsOfSystemTime() + `'`).Scan(&orig); err != nil {
] AS OF SYSTEM TIME '` + ts.AsOfSystemTime() + `'`).Scan(&orig)
}); err != nil {
return err
}
var check string
if err := v.sqlDB.QueryRow(`SELECT IFNULL(fingerprint, 'EMPTY') FROM [
if err := v.maybeRetry(func() error {
return v.sqlDB.QueryRow(`SELECT IFNULL(fingerprint, 'EMPTY') FROM [
SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE ` + v.fprintTable + `
]`).Scan(&check); err != nil {
]`).Scan(&check)
}); err != nil {
return err
}
if orig != check {
Expand All @@ -591,6 +621,17 @@ func (v *fingerprintValidator) Failures() []string {
return v.failures
}

// maybeRetry will retry the function passed if the fingerprint was
// created with `shouldRetry` set to `true`. Every access to `sqlDB`
// should be made my closures passed to this function
func (v *fingerprintValidator) maybeRetry(f func() error) error {
if v.shouldRetry {
return retry.ForDuration(retryDuration, f)
}

return f()
}

// Validators abstracts over running multiple `Validator`s at once on the same
// feed.
type Validators []Validator
Expand Down Expand Up @@ -731,3 +772,22 @@ func fetchPrimaryKeyCols(sqlDB *gosql.DB, tableStr string) ([]string, error) {
}
return primaryKeyCols, nil
}

// asGoJSON tries to unmarshal the given string as JSON; if
// successful, the struct is marshalled back to JSON. This is to
// enforce the default formatting of the standard library marshaller,
// allowing comparisons of JSON strings when we don't control the
// formatting of the strings.
func asGoJSON(s string) string {
var obj interface{}
if err := gojson.Unmarshal([]byte(s), &obj); err != nil {
return s
}

blob, err := gojson.Marshal(obj)
if err != nil {
return s
}

return string(blob)
}
22 changes: 11 additions & 11 deletions pkg/ccl/changefeedccl/cdctest/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,14 +292,14 @@ func TestFingerprintValidator(t *testing.T) {

t.Run(`empty`, func(t *testing.T) {
sqlDB.Exec(t, createTableStmt(`empty`))
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `empty`, []string{`p`}, testColumns)
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `empty`, []string{`p`}, testColumns, false)
require.NoError(t, err)
noteResolved(t, v, `p`, ts[0])
assertValidatorFailures(t, v)
})
t.Run(`wrong data`, func(t *testing.T) {
sqlDB.Exec(t, createTableStmt(`wrong_data`))
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `wrong_data`, []string{`p`}, testColumns)
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `wrong_data`, []string{`p`}, testColumns, false)
require.NoError(t, err)
noteRow(t, v, ignored, `[1]`, `{"after": {"k":1,"v":10}}`, ts[1])
noteResolved(t, v, `p`, ts[1])
Expand All @@ -310,7 +310,7 @@ func TestFingerprintValidator(t *testing.T) {
})
t.Run(`all resolved`, func(t *testing.T) {
sqlDB.Exec(t, createTableStmt(`all_resolved`))
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `all_resolved`, []string{`p`}, testColumns)
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `all_resolved`, []string{`p`}, testColumns, false)
require.NoError(t, err)
if err := v.NoteResolved(`p`, ts[0]); err != nil {
t.Fatal(err)
Expand All @@ -329,7 +329,7 @@ func TestFingerprintValidator(t *testing.T) {
})
t.Run(`rows unsorted`, func(t *testing.T) {
sqlDB.Exec(t, createTableStmt(`rows_unsorted`))
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `rows_unsorted`, []string{`p`}, testColumns)
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `rows_unsorted`, []string{`p`}, testColumns, false)
require.NoError(t, err)
noteRow(t, v, ignored, `[1]`, `{"after": {"k":1,"v":3}}`, ts[3])
noteRow(t, v, ignored, `[1]`, `{"after": {"k":1,"v":2}}`, ts[2])
Expand All @@ -341,7 +341,7 @@ func TestFingerprintValidator(t *testing.T) {
})
t.Run(`missed initial`, func(t *testing.T) {
sqlDB.Exec(t, createTableStmt(`missed_initial`))
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `missed_initial`, []string{`p`}, testColumns)
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `missed_initial`, []string{`p`}, testColumns, false)
require.NoError(t, err)
noteResolved(t, v, `p`, ts[0])
// Intentionally missing {"k":1,"v":1} at ts[1].
Expand All @@ -357,7 +357,7 @@ func TestFingerprintValidator(t *testing.T) {
})
t.Run(`missed middle`, func(t *testing.T) {
sqlDB.Exec(t, createTableStmt(`missed_middle`))
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `missed_middle`, []string{`p`}, testColumns)
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `missed_middle`, []string{`p`}, testColumns, false)
require.NoError(t, err)
noteResolved(t, v, `p`, ts[0])
noteRow(t, v, ignored, `[1]`, `{"after": {"k":1,"v":1}}`, ts[1])
Expand All @@ -375,7 +375,7 @@ func TestFingerprintValidator(t *testing.T) {
})
t.Run(`missed end`, func(t *testing.T) {
sqlDB.Exec(t, createTableStmt(`missed_end`))
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `missed_end`, []string{`p`}, testColumns)
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `missed_end`, []string{`p`}, testColumns, false)
require.NoError(t, err)
noteResolved(t, v, `p`, ts[0])
noteRow(t, v, ignored, `[1]`, `{"after": {"k":1,"v":1}}`, ts[1])
Expand All @@ -390,7 +390,7 @@ func TestFingerprintValidator(t *testing.T) {
})
t.Run(`initial scan`, func(t *testing.T) {
sqlDB.Exec(t, createTableStmt(`initial_scan`))
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `initial_scan`, []string{`p`}, testColumns)
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `initial_scan`, []string{`p`}, testColumns, false)
require.NoError(t, err)
noteRow(t, v, ignored, `[1]`, `{"after": {"k":1,"v":3}}`, ts[3])
noteRow(t, v, ignored, `[2]`, `{"after": {"k":2,"v":2}}`, ts[3])
Expand All @@ -399,15 +399,15 @@ func TestFingerprintValidator(t *testing.T) {
})
t.Run(`unknown partition`, func(t *testing.T) {
sqlDB.Exec(t, createTableStmt(`unknown_partition`))
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `unknown_partition`, []string{`p`}, testColumns)
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `unknown_partition`, []string{`p`}, testColumns, false)
require.NoError(t, err)
if err := v.NoteResolved(`nope`, ts[1]); !testutils.IsError(err, `unknown partition`) {
t.Fatalf(`expected "unknown partition" error got: %+v`, err)
}
})
t.Run(`resolved unsorted`, func(t *testing.T) {
sqlDB.Exec(t, createTableStmt(`resolved_unsorted`))
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `resolved_unsorted`, []string{`p`}, testColumns)
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `resolved_unsorted`, []string{`p`}, testColumns, false)
require.NoError(t, err)
noteRow(t, v, ignored, `[1]`, `{"after": {"k":1,"v":1}}`, ts[1])
noteResolved(t, v, `p`, ts[1])
Expand All @@ -417,7 +417,7 @@ func TestFingerprintValidator(t *testing.T) {
})
t.Run(`two partitions`, func(t *testing.T) {
sqlDB.Exec(t, createTableStmt(`two_partitions`))
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `two_partitions`, []string{`p0`, `p1`}, testColumns)
v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `two_partitions`, []string{`p0`, `p1`}, testColumns, false)
require.NoError(t, err)
noteRow(t, v, ignored, `[1]`, `{"after": {"k":1,"v":1}}`, ts[1])
noteRow(t, v, ignored, `[1]`, `{"after": {"k":1,"v":2}}`, ts[2])
Expand Down
2 changes: 2 additions & 0 deletions pkg/cmd/roachtest/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ go_library(
"liquibase_blocklist.go",
"loss_of_quorum_recovery.go",
"many_splits.go",
"mixed_version_cdc.go",
"mixed_version_decommission.go",
"mixed_version_jobs.go",
"mixed_version_schemachange.go",
Expand Down Expand Up @@ -199,6 +200,7 @@ go_library(
"//pkg/util/randutil",
"//pkg/util/retry",
"//pkg/util/search",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/version",
"//pkg/workload/histogram",
Expand Down
46 changes: 29 additions & 17 deletions pkg/cmd/roachtest/tests/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,6 @@ func cdcBasicTest(ctx context.Context, t test.Test, c cluster.Cluster, args cdcT
}

func runCDCBank(ctx context.Context, t test.Test, c cluster.Cluster) {

// Make the logs dir on every node to work around the `roachprod get logs`
// spam.
c.Run(ctx, c.All(), `mkdir -p logs`)
Expand All @@ -305,21 +304,9 @@ func runCDCBank(ctx context.Context, t test.Test, c cluster.Cluster) {
c.Put(ctx, t.Cockroach(), "./cockroach", crdbNodes)
c.Put(ctx, t.DeprecatedWorkload(), "./workload", workloadNode)
c.Start(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings(), crdbNodes)
kafka := kafkaManager{
t: t,
c: c,
nodes: kafkaNode,
}
kafka.install(ctx)
if !c.IsLocal() {
// TODO(dan): This test currently connects to kafka from the test
// runner, so kafka needs to advertise the external address. Better
// would be a binary we could run on one of the roachprod machines.
c.Run(ctx, kafka.nodes, `echo "advertised.listeners=PLAINTEXT://`+kafka.consumerURL(ctx)+`" >> `+
filepath.Join(kafka.configDir(), "server.properties"))
}
kafka.start(ctx, "kafka")
defer kafka.stop(ctx)

kafka, cleanup := setupKafka(ctx, t, c, kafkaNode)
defer cleanup()

t.Status("creating kafka topic")
if err := kafka.createTopic(ctx, "bank"); err != nil {
Expand Down Expand Up @@ -417,7 +404,7 @@ func runCDCBank(ctx context.Context, t test.Test, c cluster.Cluster) {
return errors.Wrap(err, "CREATE TABLE failed")
}

fprintV, err := cdctest.NewFingerprintValidator(db, `bank.bank`, `fprint`, tc.partitions, 0)
fprintV, err := cdctest.NewFingerprintValidator(db, `bank.bank`, `fprint`, tc.partitions, 0, false)
if err != nil {
return errors.Wrap(err, "error creating validator")
}
Expand Down Expand Up @@ -1834,6 +1821,31 @@ func stopFeeds(db *gosql.DB) {
)`)
}

// setupKafka installs Kafka on the cluster and configures it so that
// the test runner can connect to it. Returns a function to be called
// at the end of the test for stopping Kafka.
func setupKafka(
ctx context.Context, t test.Test, c cluster.Cluster, nodes option.NodeListOption,
) (kafkaManager, func()) {
kafka := kafkaManager{
t: t,
c: c,
nodes: nodes,
}

kafka.install(ctx)
if !c.IsLocal() {
// TODO(dan): This test currently connects to kafka from the test
// runner, so kafka needs to advertise the external address. Better
// would be a binary we could run on one of the roachprod machines.
c.Run(ctx, kafka.nodes, `echo "advertised.listeners=PLAINTEXT://`+kafka.consumerURL(ctx)+`" >> `+
filepath.Join(kafka.configDir(), "server.properties"))
}

kafka.start(ctx, "kafka")
return kafka, func() { kafka.stop(ctx) }
}

type topicConsumer struct {
sarama.Consumer

Expand Down
Loading

0 comments on commit 3525705

Please sign in to comment.