Skip to content

Commit

Permalink
Merge #32405
Browse files Browse the repository at this point in the history
32405: changefeedccl: run one test with RangeFeeds r=nvanbenschoten,mrtracy a=danhhz

Here we go!

This is perhaps overthinking it, but this `rangefeedTest` impl lets us
run it either as a sinklessTest or an enterpriseTest. The sinklessTest
versions tend to be easier to debug, but two of the tests only work as
enterpriseTests.

Release note: None

Co-authored-by: Daniel Harrison <[email protected]>
  • Loading branch information
craig[bot] and danhhz committed Nov 19, 2018
2 parents 5add5b3 + 3186b04 commit 9417c7c
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 25 deletions.
31 changes: 19 additions & 12 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func TestChangefeedBasics(t *testing.T) {

t.Run(`sinkless`, sinklessTest(testFn))
t.Run(`enterprise`, enterpriseTest(testFn))
t.Run(`rangefeed`, rangefeedTest(sinklessTest, testFn))
}

func TestChangefeedEnvelope(t *testing.T) {
Expand Down Expand Up @@ -579,8 +580,9 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) {
})
sqlDB.Exec(t, `ALTER TABLE add_column_def ADD COLUMN b STRING DEFAULT 'd'`)
assertPayloads(t, addColumnDef, []string{
`add_column_def: [1]->{"a": 1}`,
`add_column_def: [2]->{"a": 2}`,
// TODO(dan): Track duplicates more precisely in sinklessFeed/tableFeed.
// `add_column_def: [1]->{"a": 1}`,
// `add_column_def: [2]->{"a": 2}`,
`add_column_def: [1]->{"a": 1, "b": "d"}`,
`add_column_def: [2]->{"a": 2, "b": "d"}`,
})
Expand All @@ -598,8 +600,9 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) {
})
sqlDB.Exec(t, `ALTER TABLE add_col_comp ADD COLUMN c INT AS (a + 10) STORED`)
assertPayloads(t, addColComp, []string{
`add_col_comp: [1]->{"a": 1, "b": 6}`,
`add_col_comp: [2]->{"a": 2, "b": 7}`,
// TODO(dan): Track duplicates more precisely in sinklessFeed/tableFeed.
// `add_col_comp: [1]->{"a": 1, "b": 6}`,
// `add_col_comp: [2]->{"a": 2, "b": 7}`,
`add_col_comp: [1]->{"a": 1, "b": 6, "c": 11}`,
`add_col_comp: [2]->{"a": 2, "b": 7, "c": 12}`,
})
Expand All @@ -622,8 +625,9 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) {
`drop_column: [1]->{"a": 1}`,
`drop_column: [2]->{"a": 2}`,
`drop_column: [3]->{"a": 3}`,
`drop_column: [1]->{"a": 1}`,
`drop_column: [2]->{"a": 2}`,
// TODO(dan): Track duplicates more precisely in sinklessFeed/tableFeed.
// `drop_column: [1]->{"a": 1}`,
// `drop_column: [2]->{"a": 2}`,
})
})

Expand Down Expand Up @@ -663,17 +667,20 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) {
`multiple_alters: [1]->{"a": 1}`,
`multiple_alters: [2]->{"a": 2}`,
// Scan output for DROP
`multiple_alters: [1]->{"a": 1}`,
`multiple_alters: [2]->{"a": 2}`,
// TODO(dan): Track duplicates more precisely in sinklessFeed/tableFeed.
// `multiple_alters: [1]->{"a": 1}`,
// `multiple_alters: [2]->{"a": 2}`,
// Backfill no-ops for column C
`multiple_alters: [1]->{"a": 1}`,
`multiple_alters: [2]->{"a": 2}`,
// TODO(dan): Track duplicates more precisely in sinklessFeed/tableFeed.
// `multiple_alters: [1]->{"a": 1}`,
// `multiple_alters: [2]->{"a": 2}`,
// Scan output for column C
`multiple_alters: [1]->{"a": 1, "c": "cee"}`,
`multiple_alters: [2]->{"a": 2, "c": "cee"}`,
// Backfill no-ops for column D (C schema change is complete)
`multiple_alters: [1]->{"a": 1, "c": "cee"}`,
`multiple_alters: [2]->{"a": 2, "c": "cee"}`,
// TODO(dan): Track duplicates more precisely in sinklessFeed/tableFeed.
// `multiple_alters: [1]->{"a": 1, "c": "cee"}`,
// `multiple_alters: [2]->{"a": 2, "c": "cee"}`,
// Scan output for column C
`multiple_alters: [1]->{"a": 1, "c": "cee", "d": "dee"}`,
`multiple_alters: [2]->{"a": 2, "c": "cee", "d": "dee"}`,
Expand Down
68 changes: 55 additions & 13 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func (f *sinklessFeedFactory) Feed(t testing.TB, create string, args ...interfac
t.Fatal(err)
}

s := &sinklessFeed{db: f.db}
s := &sinklessFeed{db: f.db, seen: make(map[string]struct{})}
now := timeutil.Now()
var err error
s.rows, err = s.db.Query(create, args...)
Expand Down Expand Up @@ -314,6 +314,7 @@ type sinklessFeed struct {
db *gosql.DB
rows *gosql.Rows
queryID string
seen map[string]struct{}
}

func (c *sinklessFeed) Partitions() []string { return []string{`sinkless`} }
Expand All @@ -324,18 +325,30 @@ func (c *sinklessFeed) Next(
t.Helper()
partition = `sinkless`
var noKey, noValue, noResolved []byte
if !c.rows.Next() {
return ``, ``, nil, nil, nil, false
}
var maybeTopic gosql.NullString
if err := c.rows.Scan(&maybeTopic, &key, &value); err != nil {
t.Fatal(err)
}
if maybeTopic.Valid {
return maybeTopic.String, partition, key, value, noResolved, true
for {
if !c.rows.Next() {
return ``, ``, nil, nil, nil, false
}
var maybeTopic gosql.NullString
if err := c.rows.Scan(&maybeTopic, &key, &value); err != nil {
t.Fatal(err)
}
if maybeTopic.Valid {
// TODO(dan): This skips duplicates, since they're allowed by the
// semantics of our changefeeds. Now that we're switching to
// RangeFeed, this can actually happen (usually because of splits)
// and cause flakes. However, we really should be de-deuping key+ts,
// this is too coarse. Fixme.
seenKey := maybeTopic.String + partition + string(key) + string(value)
if _, ok := c.seen[seenKey]; ok {
continue
}
c.seen[seenKey] = struct{}{}
return maybeTopic.String, partition, key, value, noResolved, true
}
resolvedPayload := value
return ``, partition, noKey, noValue, resolvedPayload, true
}
resolvedPayload := value
return ``, partition, noKey, noValue, resolvedPayload, true
}

func (c *sinklessFeed) Err() error {
Expand Down Expand Up @@ -380,7 +393,10 @@ func (f *tableFeedFactory) Feed(t testing.TB, create string, args ...interface{}
}

sink.Scheme = sinkSchemeExperimentalSQL
c := &tableFeed{db: db, urlCleanup: cleanup, sinkURI: sink.String(), flushCh: f.flushCh}
c := &tableFeed{
db: db, urlCleanup: cleanup, sinkURI: sink.String(), flushCh: f.flushCh,
seen: make(map[string]struct{}),
}
if _, err := c.db.Exec(
`SET CLUSTER SETTING changefeed.experimental_poll_interval = '0ns'`,
); err != nil {
Expand Down Expand Up @@ -419,6 +435,8 @@ type tableFeed struct {

rows *gosql.Rows
jobErr error

seen map[string]struct{}
}

func (c *tableFeed) Partitions() []string {
Expand Down Expand Up @@ -456,10 +474,22 @@ func (c *tableFeed) Next(
if err := c.rows.Scan(&topic, &partition, &msgID, &key, &value, &payload); err != nil {
t.Fatal(err)
}

// Scan turns NULL bytes columns into a 0-length, non-nil byte
// array, which is pretty unexpected. Nil them out before returning.
// Either key+value or payload will be set, but not both.
if len(key) > 0 {
// TODO(dan): This skips duplicates, since they're allowed by
// the semantics of our changefeeds. Now that we're switching to
// RangeFeed, this can actually happen (usually because of
// splits) and cause flakes. However, we really should be
// de-deuping key+ts, this is too coarse. Fixme.
seenKey := topic + partition + string(key) + string(value)
if _, ok := c.seen[seenKey]; ok {
continue
}
c.seen[seenKey] = struct{}{}

payload = nil
} else {
key, value = nil, nil
Expand Down Expand Up @@ -700,6 +730,18 @@ func enterpriseTest(testFn func(*testing.T, *gosql.DB, testfeedFactory)) func(*t
}
}

func rangefeedTest(
metaTestFn func(func(*testing.T, *gosql.DB, testfeedFactory)) func(*testing.T),
testFn func(*testing.T, *gosql.DB, testfeedFactory),
) func(*testing.T) {
return func(t *testing.T) {
metaTestFn(func(t *testing.T, db *gosql.DB, f testfeedFactory) {
sqlutils.MakeSQLRunner(db).Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`)
testFn(t, db, f)
})(t)
}
}

func forceTableGC(
t testing.TB,
tsi serverutils.TestServerInterface,
Expand Down

0 comments on commit 9417c7c

Please sign in to comment.