Skip to content

Commit

Permalink
cdctest: fix cdc/bank roachtest
Browse files Browse the repository at this point in the history
Before, we were validating the topics of test feed messages
inside the beforeAfter validator which was being used in the
cdc/bank roachtest. That validation should have been put in
its own validator. This commit moves that validation and
the key_in_value validation into their own validators.

Fixes: cockroachdb#139109

Release note: None
  • Loading branch information
aerfrei committed Jan 21, 2025
1 parent 38b28bf commit 84e8d2c
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 73 deletions.
13 changes: 12 additions & 1 deletion pkg/ccl/changefeedccl/cdctest/nemeses.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,14 +275,17 @@ func RunNemesis(
return nil, err
}

baV, err := NewBeforeAfterValidator(db, `foo`, cfo)
baV, err := NewBeforeAfterValidator(db, `foo`)
if err != nil {
return nil, err
}

tV := NewTopicValidator(`foo`, cfo.FullTableName)

validators := Validators{
NewOrderValidator(`foo`),
baV,
tV,
}

if nOp.EnableFpValidator {
Expand All @@ -293,6 +296,14 @@ func RunNemesis(
validators = append(validators, fprintV)
}

if cfo.KeyInValue {
kivV, err := NewKeyInValueValidator(db, `foo`)
if err != nil {
return nil, err
}
validators = append(validators, kivV)
}

ns.v = NewCountValidator(validators)

// Initialize the actual row count, overwriting what the initialization loop did. That
Expand Down
156 changes: 116 additions & 40 deletions pkg/ccl/changefeedccl/cdctest/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,18 +198,14 @@ type beforeAfterValidator struct {
table string
primaryKeyCols []string
resolved map[string]hlc.Timestamp
fullTableName bool
keyInValue bool

failures []string
}

// NewBeforeAfterValidator returns a Validator verifies that the "before" and
// "after" fields in each row agree with the source table when performing AS OF
// SYSTEM TIME lookups before and at the row's timestamp.
func NewBeforeAfterValidator(
sqlDB *gosql.DB, table string, option ChangefeedOption,
) (Validator, error) {
func NewBeforeAfterValidator(sqlDB *gosql.DB, table string) (Validator, error) {
primaryKeyCols, err := fetchPrimaryKeyCols(sqlDB, table)
if err != nil {
return nil, errors.Wrap(err, "fetchPrimaryKeyCols failed")
Expand All @@ -218,8 +214,6 @@ func NewBeforeAfterValidator(
return &beforeAfterValidator{
sqlDB: sqlDB,
table: table,
fullTableName: option.FullTableName,
keyInValue: option.KeyInValue,
primaryKeyCols: primaryKeyCols,
resolved: make(map[string]hlc.Timestamp),
}, nil
Expand All @@ -229,19 +223,6 @@ func NewBeforeAfterValidator(
func (v *beforeAfterValidator) NoteRow(
partition, key, value string, updated hlc.Timestamp, topic string,
) error {
if v.fullTableName {
if topic != fmt.Sprintf(`d.public.%s`, v.table) {
v.failures = append(v.failures, fmt.Sprintf(
"topic %s does not match expected table d.public.%s", topic, v.table,
))
}
} else {
if topic != v.table {
v.failures = append(v.failures, fmt.Sprintf(
"topic %s does not match expected table %s", topic, v.table,
))
}
}
keyJSON, err := json.ParseJSON(key)
if err != nil {
return err
Expand All @@ -258,26 +239,6 @@ func (v *beforeAfterValidator) NoteRow(
return err
}

if v.keyInValue {
keyString := keyJSON.String()
keyInValueJSON, err := valueJSON.FetchValKey("key")
if err != nil {
return err
}

if keyInValueJSON == nil {
v.failures = append(v.failures, fmt.Sprintf(
"no key in value, expected key value %s", keyString))
} else {
keyInValueString := keyInValueJSON.String()
if keyInValueString != keyString {
v.failures = append(v.failures, fmt.Sprintf(
"key in value %s does not match expected key value %s",
keyInValueString, keyString))
}
}
}

afterJSON, err := valueJSON.FetchValKey("after")
if err != nil {
return err
Expand Down Expand Up @@ -388,6 +349,121 @@ func (v *beforeAfterValidator) Failures() []string {
return v.failures
}

type keyInValueValidator struct {
primaryKeyCols []string
failures []string
}

// NewKeyInValueValidator returns a Validator that verifies that the emitted row
// includes the key inside a field named "key" inside the value. It should be
// used only when key_in_value is specified in the changefeed.
func NewKeyInValueValidator(sqlDB *gosql.DB, table string) (Validator, error) {
primaryKeyCols, err := fetchPrimaryKeyCols(sqlDB, table)
if err != nil {
return nil, errors.Wrap(err, "fetchPrimaryKeyCols failed")
}

return &keyInValueValidator{
primaryKeyCols: primaryKeyCols,
}, nil
}

// NoteRow implements the Validator interface.
func (v *keyInValueValidator) NoteRow(
partition, key, value string, updated hlc.Timestamp, topic string,
) error {
keyJSON, err := json.ParseJSON(key)
if err != nil {
return err
}
keyJSONAsArray, ok := keyJSON.AsArray()
if !ok || len(keyJSONAsArray) != len(v.primaryKeyCols) {
return errors.Errorf(
`Not array: expected primary key columns %s got datums %s`,
v.primaryKeyCols, keyJSONAsArray)
}

valueJSON, err := json.ParseJSON(value)
if err != nil {
return err
}

keyString := keyJSON.String()
keyInValueJSON, err := valueJSON.FetchValKey("key")
if err != nil {
return err
}

if keyInValueJSON == nil {
return errors.Errorf(
"no key in value, expected key value %s", keyString)
} else {
keyInValueString := keyInValueJSON.String()
if keyInValueString != keyString {
return errors.Errorf(
"key in value %s does not match expected key value %s",
keyInValueString, keyString)
}
}

return nil
}

// NoteResolved implements the Validator interface.
func (v *keyInValueValidator) NoteResolved(partition string, resolved hlc.Timestamp) error {
return nil
}

// Failures implements the Validator interface.
func (v *keyInValueValidator) Failures() []string {
return v.failures
}

type topicValidator struct {
table string
fullTableName bool

failures []string
}

// NewTopicValidator returns a Validator that verifies that the topic field of
// the row agrees with the name of the table. In the case the full_table_name
// option is specified, it checks the topic includes the db and schema name.
func NewTopicValidator(table string, fullTableName bool) Validator {
return &topicValidator{
table: table,
fullTableName: fullTableName,
}
}

// NoteRow implements the Validator interface.
func (v *topicValidator) NoteRow(
partition, key, value string, updated hlc.Timestamp, topic string,
) error {
if v.fullTableName {
if topic != fmt.Sprintf(`d.public.%s`, v.table) {
v.failures = append(v.failures, fmt.Sprintf(
"topic %s does not match expected table d.public.%s", topic, v.table))
}
} else {
if topic != v.table {
v.failures = append(v.failures, fmt.Sprintf(
"topic %s does not match expected table d.public.%s", topic, v.table))
}
}
return nil
}

// NoteResolved implements the Validator interface.
func (v *topicValidator) NoteResolved(partition string, resolved hlc.Timestamp) error {
return nil
}

// Failures implements the Validator interface.
func (v *topicValidator) Failures() []string {
return v.failures
}

type validatorRow struct {
key, value string
updated hlc.Timestamp
Expand Down
40 changes: 13 additions & 27 deletions pkg/ccl/changefeedccl/cdctest/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,6 @@ func TestOrderValidator(t *testing.T) {
})
}

var standardChangefeedOptions = ChangefeedOption{
FullTableName: false,
KeyInValue: false,
Format: "json",
}

func TestBeforeAfterValidator(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down Expand Up @@ -140,30 +134,22 @@ func TestBeforeAfterValidator(t *testing.T) {
}

t.Run(`empty`, func(t *testing.T) {
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions)
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`)
require.NoError(t, err)
assertValidatorFailures(t, v)
})
t.Run(`fullTableName`, func(t *testing.T) {
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, ChangefeedOption{
FullTableName: true,
KeyInValue: false,
Format: "json",
})
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`)
require.NoError(t, err)
assertValidatorFailures(t, v)
})
t.Run(`key_in_value`, func(t *testing.T) {
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, ChangefeedOption{
FullTableName: false,
KeyInValue: true,
Format: "json",
})
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`)
require.NoError(t, err)
assertValidatorFailures(t, v)
})
t.Run(`during initial`, func(t *testing.T) {
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions)
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`)
require.NoError(t, err)
// "before" is ignored if missing.
noteRow(t, v, `p`, `[1]`, `{"after": {"k":1,"v":1}}`, ts[1], `foo`)
Expand All @@ -178,7 +164,7 @@ func TestBeforeAfterValidator(t *testing.T) {
`' WHERE to_json(k)::TEXT = $1 AND to_json(v)::TEXT = $2 [1 3]`)
})
t.Run(`missing before`, func(t *testing.T) {
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions)
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`)
require.NoError(t, err)
noteResolved(t, v, `p`, ts[0])
// "before" should have been provided.
Expand All @@ -189,7 +175,7 @@ func TestBeforeAfterValidator(t *testing.T) {
`' WHERE to_json(k)::TEXT = $1 [1]`)
})
t.Run(`incorrect before`, func(t *testing.T) {
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions)
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`)
require.NoError(t, err)
noteResolved(t, v, `p`, ts[0])
// "before" provided with wrong value.
Expand All @@ -200,7 +186,7 @@ func TestBeforeAfterValidator(t *testing.T) {
`' WHERE to_json(k)::TEXT = $1 AND to_json(v)::TEXT = $2 [5 10]`)
})
t.Run(`unnecessary before`, func(t *testing.T) {
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions)
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`)
require.NoError(t, err)
noteResolved(t, v, `p`, ts[0])
// "before" provided but should not have been.
Expand All @@ -211,7 +197,7 @@ func TestBeforeAfterValidator(t *testing.T) {
`' WHERE to_json(k)::TEXT = $1 AND to_json(v)::TEXT = $2 [1 1]`)
})
t.Run(`missing after`, func(t *testing.T) {
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions)
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`)
require.NoError(t, err)
noteResolved(t, v, `p`, ts[0])
// "after" should have been provided.
Expand All @@ -222,7 +208,7 @@ func TestBeforeAfterValidator(t *testing.T) {
`' WHERE to_json(k)::TEXT = $1 [1]`)
})
t.Run(`incorrect after`, func(t *testing.T) {
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions)
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`)
require.NoError(t, err)
noteResolved(t, v, `p`, ts[0])
// "after" provided with wrong value.
Expand All @@ -233,7 +219,7 @@ func TestBeforeAfterValidator(t *testing.T) {
`' WHERE to_json(k)::TEXT = $1 AND to_json(v)::TEXT = $2 [1 5]`)
})
t.Run(`unnecessary after`, func(t *testing.T) {
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions)
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`)
require.NoError(t, err)
noteResolved(t, v, `p`, ts[0])
// "after" provided but should not have been.
Expand All @@ -244,7 +230,7 @@ func TestBeforeAfterValidator(t *testing.T) {
`' WHERE to_json(k)::TEXT = $1 AND to_json(v)::TEXT = $2 [1 3]`)
})
t.Run(`incorrect before and after`, func(t *testing.T) {
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions)
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`)
require.NoError(t, err)
noteResolved(t, v, `p`, ts[0])
// "before" and "after" both provided with wrong value.
Expand All @@ -258,7 +244,7 @@ func TestBeforeAfterValidator(t *testing.T) {
`' WHERE to_json(k)::TEXT = $1 AND to_json(v)::TEXT = $2 [1 4]`)
})
t.Run(`correct`, func(t *testing.T) {
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions)
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`)
require.NoError(t, err)
noteResolved(t, v, `p`, ts[0])
noteRow(t, v, `p`, `[1]`, `{}`, ts[0], `foo`)
Expand Down Expand Up @@ -297,7 +283,7 @@ func TestBeforeAfterValidatorForGeometry(t *testing.T) {
t.Fatal(err)
}
}
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions)
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`)
require.NoError(t, err)
assertValidatorFailures(t, v)
noteRow(t, v, `p`, `[1]`, `{"after": {"k":1, "geom":{"coordinates": [1,2], "type": "Point"}}}`, ts[0], `foo`)
Expand Down
6 changes: 1 addition & 5 deletions pkg/cmd/roachtest/tests/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -926,11 +926,7 @@ func runCDCBank(ctx context.Context, t test.Test, c cluster.Cluster) {
if err != nil {
return errors.Wrap(err, "error creating validator")
}
baV, err := cdctest.NewBeforeAfterValidator(db, `bank.bank`, cdctest.ChangefeedOption{
FullTableName: false,
KeyInValue: false,
Format: "json",
})
baV, err := cdctest.NewBeforeAfterValidator(db, `bank.bank`)
if err != nil {
return err
}
Expand Down

0 comments on commit 84e8d2c

Please sign in to comment.