diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 8f1101960600..f3d8f7531e55 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -513,19 +513,16 @@ func TestChangefeedResolvedFrequency(t *testing.T) { // operation. func TestChangefeedInitialScan(t *testing.T) { defer leaktest.AfterTest(t)() - skip.UnderRaceWithIssue(t, 57754, "flaky test") defer log.Scope(t).Close(t) testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(db) - sqlDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '10ms'`) - t.Run(`no cursor - no initial scan`, func(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE no_initial_scan (a INT PRIMARY KEY)`) sqlDB.Exec(t, `INSERT INTO no_initial_scan VALUES (1)`) noInitialScan := feed(t, f, `CREATE CHANGEFEED FOR no_initial_scan `+ - `WITH no_initial_scan, resolved='10ms'`) + `WITH no_initial_scan, resolved='1s'`) defer closeFeed(t, noInitialScan) expectResolvedTimestamp(t, noInitialScan) sqlDB.Exec(t, `INSERT INTO no_initial_scan VALUES (2)`) @@ -541,7 +538,7 @@ func TestChangefeedInitialScan(t *testing.T) { var i int sqlDB.QueryRow(t, `SELECT count(*), cluster_logical_timestamp() from initial_scan`).Scan(&i, &tsStr) initialScan := feed(t, f, `CREATE CHANGEFEED FOR initial_scan `+ - `WITH initial_scan, resolved='10ms', cursor='`+tsStr+`'`) + `WITH initial_scan, resolved='1s', cursor='`+tsStr+`'`) defer closeFeed(t, initialScan) assertPayloads(t, initialScan, []string{ `initial_scan: [1]->{"after": {"a": 1}}`, diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go index 238adbf248d9..d891faa24256 100644 --- a/pkg/ccl/changefeedccl/testfeed_test.go +++ b/pkg/ccl/changefeedccl/testfeed_test.go @@ -621,8 +621,6 @@ func (c *tableFeed) Next() (*cdctest.TestFeedMessage, error) { var toSend []*cdctest.TestFeedMessage if err := crdb.ExecuteTx(context.Background(), c.sinkDB, nil, func(tx *gosql.Tx) error { - - // Avoid anything that might somehow look like deadlock under stressrace. _, err := tx.Exec("SET TRANSACTION PRIORITY LOW") if err != nil { return err @@ -643,7 +641,6 @@ func (c *tableFeed) Next() (*cdctest.TestFeedMessage, error) { return err } for rows.Next() { - m := &cdctest.TestFeedMessage{} var msgID int64 if err := rows.Scan( @@ -656,10 +653,6 @@ func (c *tableFeed) Next() (*cdctest.TestFeedMessage, error) { // array, which is pretty unexpected. Nil them out before returning. // Either key+value or payload will be set, but not both. if len(m.Key) > 0 || len(m.Value) > 0 { - if isNew := c.markSeen(m); !isNew { - continue - } - m.Resolved = nil } else { m.Key, m.Value = nil, nil @@ -670,7 +663,17 @@ func (c *tableFeed) Next() (*cdctest.TestFeedMessage, error) { }); err != nil { return nil, err } - c.toSend = toSend + + for _, m := range toSend { + // NB: We should not filter seen keys in the query above -- doing so will + // result in flaky tests if txn gets restarted. + if len(m.Key) > 0 { + if isNew := c.markSeen(m); !isNew { + continue + } + } + c.toSend = append(c.toSend, m) + } } }