From 84e8d2cb3f17fbff12e8aef95b7f87d4b01f27e8 Mon Sep 17 00:00:00 2001 From: Aerin Freilich Date: Fri, 17 Jan 2025 13:24:39 -0500 Subject: [PATCH] cdctest: fix cdc/bank roachtest Before, we were validating the topics of test feed messages inside the beforeAfter validator which was being used in the cdc/bank roachtest. That validation should have been put in its own validator. This commit moves that validation and the key_in_value validation into their own validators. Fixes: #139109 Release note: None --- pkg/ccl/changefeedccl/cdctest/nemeses.go | 13 +- pkg/ccl/changefeedccl/cdctest/validator.go | 156 +++++++++++++----- .../changefeedccl/cdctest/validator_test.go | 40 ++--- pkg/cmd/roachtest/tests/cdc.go | 6 +- 4 files changed, 142 insertions(+), 73 deletions(-) diff --git a/pkg/ccl/changefeedccl/cdctest/nemeses.go b/pkg/ccl/changefeedccl/cdctest/nemeses.go index 1c0c3baa436d..85a69bdfa121 100644 --- a/pkg/ccl/changefeedccl/cdctest/nemeses.go +++ b/pkg/ccl/changefeedccl/cdctest/nemeses.go @@ -275,14 +275,17 @@ func RunNemesis( return nil, err } - baV, err := NewBeforeAfterValidator(db, `foo`, cfo) + baV, err := NewBeforeAfterValidator(db, `foo`) if err != nil { return nil, err } + tV := NewTopicValidator(`foo`, cfo.FullTableName) + validators := Validators{ NewOrderValidator(`foo`), baV, + tV, } if nOp.EnableFpValidator { @@ -293,6 +296,14 @@ func RunNemesis( validators = append(validators, fprintV) } + if cfo.KeyInValue { + kivV, err := NewKeyInValueValidator(db, `foo`) + if err != nil { + return nil, err + } + validators = append(validators, kivV) + } + ns.v = NewCountValidator(validators) // Initialize the actual row count, overwriting what the initialization loop did. That diff --git a/pkg/ccl/changefeedccl/cdctest/validator.go b/pkg/ccl/changefeedccl/cdctest/validator.go index 96e6d8ddd5af..8be90185e585 100644 --- a/pkg/ccl/changefeedccl/cdctest/validator.go +++ b/pkg/ccl/changefeedccl/cdctest/validator.go @@ -198,8 +198,6 @@ type beforeAfterValidator struct { table string primaryKeyCols []string resolved map[string]hlc.Timestamp - fullTableName bool - keyInValue bool failures []string } @@ -207,9 +205,7 @@ type beforeAfterValidator struct { // NewBeforeAfterValidator returns a Validator verifies that the "before" and // "after" fields in each row agree with the source table when performing AS OF // SYSTEM TIME lookups before and at the row's timestamp. -func NewBeforeAfterValidator( - sqlDB *gosql.DB, table string, option ChangefeedOption, -) (Validator, error) { +func NewBeforeAfterValidator(sqlDB *gosql.DB, table string) (Validator, error) { primaryKeyCols, err := fetchPrimaryKeyCols(sqlDB, table) if err != nil { return nil, errors.Wrap(err, "fetchPrimaryKeyCols failed") @@ -218,8 +214,6 @@ func NewBeforeAfterValidator( return &beforeAfterValidator{ sqlDB: sqlDB, table: table, - fullTableName: option.FullTableName, - keyInValue: option.KeyInValue, primaryKeyCols: primaryKeyCols, resolved: make(map[string]hlc.Timestamp), }, nil @@ -229,19 +223,6 @@ func NewBeforeAfterValidator( func (v *beforeAfterValidator) NoteRow( partition, key, value string, updated hlc.Timestamp, topic string, ) error { - if v.fullTableName { - if topic != fmt.Sprintf(`d.public.%s`, v.table) { - v.failures = append(v.failures, fmt.Sprintf( - "topic %s does not match expected table d.public.%s", topic, v.table, - )) - } - } else { - if topic != v.table { - v.failures = append(v.failures, fmt.Sprintf( - "topic %s does not match expected table %s", topic, v.table, - )) - } - } keyJSON, err := json.ParseJSON(key) if err != nil { return err @@ -258,26 +239,6 @@ func (v *beforeAfterValidator) NoteRow( return err } - if v.keyInValue { - keyString := keyJSON.String() - keyInValueJSON, err := valueJSON.FetchValKey("key") - if err != nil { - return err - } - - if keyInValueJSON == nil { - v.failures = append(v.failures, fmt.Sprintf( - "no key in value, expected key value %s", keyString)) - } else { - keyInValueString := keyInValueJSON.String() - if keyInValueString != keyString { - v.failures = append(v.failures, fmt.Sprintf( - "key in value %s does not match expected key value %s", - keyInValueString, keyString)) - } - } - } - afterJSON, err := valueJSON.FetchValKey("after") if err != nil { return err @@ -388,6 +349,121 @@ func (v *beforeAfterValidator) Failures() []string { return v.failures } +type keyInValueValidator struct { + primaryKeyCols []string + failures []string +} + +// NewKeyInValueValidator returns a Validator that verifies that the emitted row +// includes the key inside a field named "key" inside the value. It should be +// used only when key_in_value is specified in the changefeed. +func NewKeyInValueValidator(sqlDB *gosql.DB, table string) (Validator, error) { + primaryKeyCols, err := fetchPrimaryKeyCols(sqlDB, table) + if err != nil { + return nil, errors.Wrap(err, "fetchPrimaryKeyCols failed") + } + + return &keyInValueValidator{ + primaryKeyCols: primaryKeyCols, + }, nil +} + +// NoteRow implements the Validator interface. +func (v *keyInValueValidator) NoteRow( + partition, key, value string, updated hlc.Timestamp, topic string, +) error { + keyJSON, err := json.ParseJSON(key) + if err != nil { + return err + } + keyJSONAsArray, ok := keyJSON.AsArray() + if !ok || len(keyJSONAsArray) != len(v.primaryKeyCols) { + return errors.Errorf( + `Not array: expected primary key columns %s got datums %s`, + v.primaryKeyCols, keyJSONAsArray) + } + + valueJSON, err := json.ParseJSON(value) + if err != nil { + return err + } + + keyString := keyJSON.String() + keyInValueJSON, err := valueJSON.FetchValKey("key") + if err != nil { + return err + } + + if keyInValueJSON == nil { + return errors.Errorf( + "no key in value, expected key value %s", keyString) + } else { + keyInValueString := keyInValueJSON.String() + if keyInValueString != keyString { + return errors.Errorf( + "key in value %s does not match expected key value %s", + keyInValueString, keyString) + } + } + + return nil +} + +// NoteResolved implements the Validator interface. +func (v *keyInValueValidator) NoteResolved(partition string, resolved hlc.Timestamp) error { + return nil +} + +// Failures implements the Validator interface. +func (v *keyInValueValidator) Failures() []string { + return v.failures +} + +type topicValidator struct { + table string + fullTableName bool + + failures []string +} + +// NewTopicValidator returns a Validator that verifies that the topic field of +// the row agrees with the name of the table. In the case the full_table_name +// option is specified, it checks the topic includes the db and schema name. +func NewTopicValidator(table string, fullTableName bool) Validator { + return &topicValidator{ + table: table, + fullTableName: fullTableName, + } +} + +// NoteRow implements the Validator interface. +func (v *topicValidator) NoteRow( + partition, key, value string, updated hlc.Timestamp, topic string, +) error { + if v.fullTableName { + if topic != fmt.Sprintf(`d.public.%s`, v.table) { + v.failures = append(v.failures, fmt.Sprintf( + "topic %s does not match expected table d.public.%s", topic, v.table)) + } + } else { + if topic != v.table { + v.failures = append(v.failures, fmt.Sprintf( + "topic %s does not match expected table d.public.%s", topic, v.table)) + } + } + return nil +} + +// NoteResolved implements the Validator interface. +func (v *topicValidator) NoteResolved(partition string, resolved hlc.Timestamp) error { + return nil +} + +// Failures implements the Validator interface. +func (v *topicValidator) Failures() []string { + return v.failures +} + type validatorRow struct { key, value string updated hlc.Timestamp diff --git a/pkg/ccl/changefeedccl/cdctest/validator_test.go b/pkg/ccl/changefeedccl/cdctest/validator_test.go index ae8fa8ba85ad..7eddcfeb0e2a 100644 --- a/pkg/ccl/changefeedccl/cdctest/validator_test.go +++ b/pkg/ccl/changefeedccl/cdctest/validator_test.go @@ -99,12 +99,6 @@ func TestOrderValidator(t *testing.T) { }) } -var standardChangefeedOptions = ChangefeedOption{ - FullTableName: false, - KeyInValue: false, - Format: "json", -} - func TestBeforeAfterValidator(t *testing.T) { defer leaktest.AfterTest(t)() @@ -140,30 +134,22 @@ func TestBeforeAfterValidator(t *testing.T) { } t.Run(`empty`, func(t *testing.T) { - v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions) + v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`) require.NoError(t, err) assertValidatorFailures(t, v) }) t.Run(`fullTableName`, func(t *testing.T) { - v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, ChangefeedOption{ - FullTableName: true, - KeyInValue: false, - Format: "json", - }) + v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`) require.NoError(t, err) assertValidatorFailures(t, v) }) t.Run(`key_in_value`, func(t *testing.T) { - v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, ChangefeedOption{ - FullTableName: false, - KeyInValue: true, - Format: "json", - }) + v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`) require.NoError(t, err) assertValidatorFailures(t, v) }) t.Run(`during initial`, func(t *testing.T) { - v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions) + v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`) require.NoError(t, err) // "before" is ignored if missing. noteRow(t, v, `p`, `[1]`, `{"after": {"k":1,"v":1}}`, ts[1], `foo`) @@ -178,7 +164,7 @@ func TestBeforeAfterValidator(t *testing.T) { `' WHERE to_json(k)::TEXT = $1 AND to_json(v)::TEXT = $2 [1 3]`) }) t.Run(`missing before`, func(t *testing.T) { - v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions) + v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`) require.NoError(t, err) noteResolved(t, v, `p`, ts[0]) // "before" should have been provided. @@ -189,7 +175,7 @@ func TestBeforeAfterValidator(t *testing.T) { `' WHERE to_json(k)::TEXT = $1 [1]`) }) t.Run(`incorrect before`, func(t *testing.T) { - v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions) + v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`) require.NoError(t, err) noteResolved(t, v, `p`, ts[0]) // "before" provided with wrong value. @@ -200,7 +186,7 @@ func TestBeforeAfterValidator(t *testing.T) { `' WHERE to_json(k)::TEXT = $1 AND to_json(v)::TEXT = $2 [5 10]`) }) t.Run(`unnecessary before`, func(t *testing.T) { - v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions) + v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`) require.NoError(t, err) noteResolved(t, v, `p`, ts[0]) // "before" provided but should not have been. @@ -211,7 +197,7 @@ func TestBeforeAfterValidator(t *testing.T) { `' WHERE to_json(k)::TEXT = $1 AND to_json(v)::TEXT = $2 [1 1]`) }) t.Run(`missing after`, func(t *testing.T) { - v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions) + v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`) require.NoError(t, err) noteResolved(t, v, `p`, ts[0]) // "after" should have been provided. @@ -222,7 +208,7 @@ func TestBeforeAfterValidator(t *testing.T) { `' WHERE to_json(k)::TEXT = $1 [1]`) }) t.Run(`incorrect after`, func(t *testing.T) { - v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions) + v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`) require.NoError(t, err) noteResolved(t, v, `p`, ts[0]) // "after" provided with wrong value. @@ -233,7 +219,7 @@ func TestBeforeAfterValidator(t *testing.T) { `' WHERE to_json(k)::TEXT = $1 AND to_json(v)::TEXT = $2 [1 5]`) }) t.Run(`unnecessary after`, func(t *testing.T) { - v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions) + v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`) require.NoError(t, err) noteResolved(t, v, `p`, ts[0]) // "after" provided but should not have been. @@ -244,7 +230,7 @@ func TestBeforeAfterValidator(t *testing.T) { `' WHERE to_json(k)::TEXT = $1 AND to_json(v)::TEXT = $2 [1 3]`) }) t.Run(`incorrect before and after`, func(t *testing.T) { - v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions) + v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`) require.NoError(t, err) noteResolved(t, v, `p`, ts[0]) // "before" and "after" both provided with wrong value. @@ -258,7 +244,7 @@ func TestBeforeAfterValidator(t *testing.T) { `' WHERE to_json(k)::TEXT = $1 AND to_json(v)::TEXT = $2 [1 4]`) }) t.Run(`correct`, func(t *testing.T) { - v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions) + v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`) require.NoError(t, err) noteResolved(t, v, `p`, ts[0]) noteRow(t, v, `p`, `[1]`, `{}`, ts[0], `foo`) @@ -297,7 +283,7 @@ func TestBeforeAfterValidatorForGeometry(t *testing.T) { t.Fatal(err) } } - v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions) + v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`) require.NoError(t, err) assertValidatorFailures(t, v) noteRow(t, v, `p`, `[1]`, `{"after": {"k":1, "geom":{"coordinates": [1,2], "type": "Point"}}}`, ts[0], `foo`) diff --git a/pkg/cmd/roachtest/tests/cdc.go b/pkg/cmd/roachtest/tests/cdc.go index d79972d417e3..53e03bd6c72c 100644 --- a/pkg/cmd/roachtest/tests/cdc.go +++ b/pkg/cmd/roachtest/tests/cdc.go @@ -926,11 +926,7 @@ func runCDCBank(ctx context.Context, t test.Test, c cluster.Cluster) { if err != nil { return errors.Wrap(err, "error creating validator") } - baV, err := cdctest.NewBeforeAfterValidator(db, `bank.bank`, cdctest.ChangefeedOption{ - FullTableName: false, - KeyInValue: false, - Format: "json", - }) + baV, err := cdctest.NewBeforeAfterValidator(db, `bank.bank`) if err != nil { return err }