Skip to content

Commit

Permalink
Merge #94864
Browse files Browse the repository at this point in the history
94864: changefeedccl: Fix flaky test r=miretskiy a=miretskiy

Disable enterprise and webhook sink in TestChangefeedOnlyInitialScan test since these sink (or at least our test implementations) are too slow to process many messages.

Fixes #94816

Release note: None

Co-authored-by: Yevgeniy Miretskiy <[email protected]>
  • Loading branch information
craig[bot] and Yevgeniy Miretskiy committed Jan 11, 2023
2 parents e22fc30 + 795d871 commit 32ef463
Showing 1 changed file with 15 additions and 28 deletions.
43 changes: 15 additions & 28 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6547,54 +6547,41 @@ func TestChangefeedOnlyInitialScan(t *testing.T) {
t.Run(testName, func(t *testing.T) {
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO foo (a) SELECT * FROM generate_series(1, 5000);`)

// Most changefeed tests can afford to have a race condition between the initial
// inserts and starting the feed because the output looks the same for an initial
// scan and an insert. For tests with initial_scan=only, though, we can't start the feed
// until it's going to see all the initial inserts in the initial scan.
sqlDB.CheckQueryResultsRetry(t, `SELECT count(*) FROM foo`, [][]string{{`5000`}})
defer func() {
sqlDB.Exec(t, `DROP TABLE foo`)
}()

feed := feed(t, f, changefeedStmt)
defer closeFeed(t, feed)

// Insert few more rows after the feed started -- we should not see those emitted.
sqlDB.Exec(t, "INSERT INTO foo VALUES (5005), (5007), (5009)")

g := ctxgroup.WithContext(context.Background())
var expectedMessages []string
for i := 1; i <= 5000; i++ {
expectedMessages = append(expectedMessages, fmt.Sprintf(
`foo: [%d]->{"after": {"a": %d}}`, i, i,
))
}
var seenMessages []string
g.Go(func() error {
for {
m, err := feed.Next()
if err != nil {
return err
}
seenMessages = append(seenMessages, fmt.Sprintf(`%s: %s->%s`, m.Topic, m.Key, m.Value))
}
})

assertPayloads(t, feed, expectedMessages)

// It would be nice to assert that after we've seen expectedMessages,
// that none of the unexpected messages show up before job termination.
// However, if any of those unexpected messages were emitted, then, we
// would expect this test to flake (hopefully, with an error message
// that makes it clear that the unexpected event happen).
jobFeed := feed.(cdctest.EnterpriseTestFeed)
require.NoError(t, jobFeed.WaitForStatus(func(s jobs.Status) bool {
return s == jobs.StatusSucceeded
}))

closeFeed(t, feed)
sqlDB.Exec(t, `DROP TABLE foo`)
_ = g.Wait()
require.Equal(t, len(expectedMessages), len(seenMessages))
sort.Strings(expectedMessages)
sort.Strings(seenMessages)
for i := range expectedMessages {
require.Equal(t, expectedMessages[i], seenMessages[i])
}
})
}
}

cdcTest(t, testFn, feedTestEnterpriseSinks)
// "enterprise" and "webhook" sink implementations are too slow
// for a test that reads 5k messages.
cdcTest(t, testFn, feedTestEnterpriseSinks, feedTestOmitSinks("enterprise", "webhook"))
}

func TestChangefeedOnlyInitialScanCSV(t *testing.T) {
Expand Down

0 comments on commit 32ef463

Please sign in to comment.