Skip to content

Commit

Permalink
changefeedccl: Fix flaky tests.
Browse files Browse the repository at this point in the history
Fix flaky test and re-enable it to run under stress.
The problem was that the transaction executed by the table feed can
be restarted.  If that happens, then we would see the same keys again,
but because we had side effects inside transaction (marking the keys
seen), we would not emit those keys causing the test to be hung.
The stress race was failing because of both transaction restarts and
the 10ms resolved timestamp frequency (with so many resolved timestamps
being generated, the table feed transaction was always getting
restarted).

Fixes #57754
Fixes #65168

Release Notes: None
  • Loading branch information
Yevgeniy Miretskiy committed Jun 1, 2021
1 parent 7628160 commit 10a60f6
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 13 deletions.
7 changes: 2 additions & 5 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,19 +513,16 @@ func TestChangefeedResolvedFrequency(t *testing.T) {
// operation.
func TestChangefeedInitialScan(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.UnderRaceWithIssue(t, 57754, "flaky test")
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '10ms'`)

t.Run(`no cursor - no initial scan`, func(t *testing.T) {
sqlDB.Exec(t, `CREATE TABLE no_initial_scan (a INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO no_initial_scan VALUES (1)`)

noInitialScan := feed(t, f, `CREATE CHANGEFEED FOR no_initial_scan `+
`WITH no_initial_scan, resolved='10ms'`)
`WITH no_initial_scan, resolved='1s'`)
defer closeFeed(t, noInitialScan)
expectResolvedTimestamp(t, noInitialScan)
sqlDB.Exec(t, `INSERT INTO no_initial_scan VALUES (2)`)
Expand All @@ -541,7 +538,7 @@ func TestChangefeedInitialScan(t *testing.T) {
var i int
sqlDB.QueryRow(t, `SELECT count(*), cluster_logical_timestamp() from initial_scan`).Scan(&i, &tsStr)
initialScan := feed(t, f, `CREATE CHANGEFEED FOR initial_scan `+
`WITH initial_scan, resolved='10ms', cursor='`+tsStr+`'`)
`WITH initial_scan, resolved='1s', cursor='`+tsStr+`'`)
defer closeFeed(t, initialScan)
assertPayloads(t, initialScan, []string{
`initial_scan: [1]->{"after": {"a": 1}}`,
Expand Down
19 changes: 11 additions & 8 deletions pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,8 +621,6 @@ func (c *tableFeed) Next() (*cdctest.TestFeedMessage, error) {

var toSend []*cdctest.TestFeedMessage
if err := crdb.ExecuteTx(context.Background(), c.sinkDB, nil, func(tx *gosql.Tx) error {

// Avoid anything that might somehow look like deadlock under stressrace.
_, err := tx.Exec("SET TRANSACTION PRIORITY LOW")
if err != nil {
return err
Expand All @@ -643,7 +641,6 @@ func (c *tableFeed) Next() (*cdctest.TestFeedMessage, error) {
return err
}
for rows.Next() {

m := &cdctest.TestFeedMessage{}
var msgID int64
if err := rows.Scan(
Expand All @@ -656,10 +653,6 @@ func (c *tableFeed) Next() (*cdctest.TestFeedMessage, error) {
// array, which is pretty unexpected. Nil them out before returning.
// Either key+value or payload will be set, but not both.
if len(m.Key) > 0 || len(m.Value) > 0 {
if isNew := c.markSeen(m); !isNew {
continue
}

m.Resolved = nil
} else {
m.Key, m.Value = nil, nil
Expand All @@ -670,7 +663,17 @@ func (c *tableFeed) Next() (*cdctest.TestFeedMessage, error) {
}); err != nil {
return nil, err
}
c.toSend = toSend

for _, m := range toSend {
// NB: We should not filter seen keys in the query above -- doing so will
// result in flaky tests if txn gets restarted.
if len(m.Key) > 0 {
if isNew := c.markSeen(m); !isNew {
continue
}
}
c.toSend = append(c.toSend, m)
}
}
}

Expand Down

0 comments on commit 10a60f6

Please sign in to comment.