From 10a60f6ec814385c4f4d57de2f9329bc6a33c597 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Fri, 28 May 2021 17:25:29 -0400 Subject: [PATCH] changefeedccl: Fix flaky tests. Fix flaky test and re-enable it to run under stress. The problem was that the transaction executed by the table feed can be restarted. If that happens, then we would see the same keys again, but because we had side effects inside transaction (marking the keys seen), we would not emit those keys causing the test to be hung. The stress race was failing because of both transaction restarts and the 10ms resolved timestamp frequency (with so many resolved timestamps being generated, the table feed transaction was always getting restarted). Fixes #57754 Fixes #65168 Release Notes: None --- pkg/ccl/changefeedccl/changefeed_test.go | 7 ++----- pkg/ccl/changefeedccl/testfeed_test.go | 19 +++++++++++-------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 8f1101960600..f3d8f7531e55 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -513,19 +513,16 @@ func TestChangefeedResolvedFrequency(t *testing.T) { // operation. func TestChangefeedInitialScan(t *testing.T) { defer leaktest.AfterTest(t)() - skip.UnderRaceWithIssue(t, 57754, "flaky test") defer log.Scope(t).Close(t) testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(db) - sqlDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '10ms'`) - t.Run(`no cursor - no initial scan`, func(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE no_initial_scan (a INT PRIMARY KEY)`) sqlDB.Exec(t, `INSERT INTO no_initial_scan VALUES (1)`) noInitialScan := feed(t, f, `CREATE CHANGEFEED FOR no_initial_scan `+ - `WITH no_initial_scan, resolved='10ms'`) + `WITH no_initial_scan, resolved='1s'`) defer closeFeed(t, noInitialScan) expectResolvedTimestamp(t, noInitialScan) sqlDB.Exec(t, `INSERT INTO no_initial_scan VALUES (2)`) @@ -541,7 +538,7 @@ func TestChangefeedInitialScan(t *testing.T) { var i int sqlDB.QueryRow(t, `SELECT count(*), cluster_logical_timestamp() from initial_scan`).Scan(&i, &tsStr) initialScan := feed(t, f, `CREATE CHANGEFEED FOR initial_scan `+ - `WITH initial_scan, resolved='10ms', cursor='`+tsStr+`'`) + `WITH initial_scan, resolved='1s', cursor='`+tsStr+`'`) defer closeFeed(t, initialScan) assertPayloads(t, initialScan, []string{ `initial_scan: [1]->{"after": {"a": 1}}`, diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go index 238adbf248d9..d891faa24256 100644 --- a/pkg/ccl/changefeedccl/testfeed_test.go +++ b/pkg/ccl/changefeedccl/testfeed_test.go @@ -621,8 +621,6 @@ func (c *tableFeed) Next() (*cdctest.TestFeedMessage, error) { var toSend []*cdctest.TestFeedMessage if err := crdb.ExecuteTx(context.Background(), c.sinkDB, nil, func(tx *gosql.Tx) error { - - // Avoid anything that might somehow look like deadlock under stressrace. _, err := tx.Exec("SET TRANSACTION PRIORITY LOW") if err != nil { return err @@ -643,7 +641,6 @@ func (c *tableFeed) Next() (*cdctest.TestFeedMessage, error) { return err } for rows.Next() { - m := &cdctest.TestFeedMessage{} var msgID int64 if err := rows.Scan( @@ -656,10 +653,6 @@ func (c *tableFeed) Next() (*cdctest.TestFeedMessage, error) { // array, which is pretty unexpected. Nil them out before returning. // Either key+value or payload will be set, but not both. if len(m.Key) > 0 || len(m.Value) > 0 { - if isNew := c.markSeen(m); !isNew { - continue - } - m.Resolved = nil } else { m.Key, m.Value = nil, nil @@ -670,7 +663,17 @@ func (c *tableFeed) Next() (*cdctest.TestFeedMessage, error) { }); err != nil { return nil, err } - c.toSend = toSend + + for _, m := range toSend { + // NB: We should not filter seen keys in the query above -- doing so will + // result in flaky tests if txn gets restarted. + if len(m.Key) > 0 { + if isNew := c.markSeen(m); !isNew { + continue + } + } + c.toSend = append(c.toSend, m) + } } }