Skip to content

Commit

Permalink
Merge pull request cockroachdb#111801 from cockroachdb/blathers/backp…
Browse files Browse the repository at this point in the history
…ort-release-22.2-111790

release-22.2: schemafeed: deflake TestTableHistoryIngestionTracking
  • Loading branch information
rafiss authored Nov 10, 2023
2 parents d04906a + c2e9eda commit da4a3b1
Showing 1 changed file with 22 additions and 1 deletion.
23 changes: 22 additions & 1 deletion pkg/ccl/changefeedccl/schemafeed/schema_feed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,20 @@ func TestTableHistoryIngestionTracking(t *testing.T) {
default:
}
}
requireWaitingFor := func(t *testing.T, sf *schemaFeed, ts hlc.Timestamp) {
t.Helper()
testutils.SucceedsSoon(t, func() error {
sf.mu.Lock()
defer sf.mu.Unlock()

for _, w := range sf.mu.waiters {
if w.ts == ts {
return nil
}
}
return errors.Newf("expected to find waiter for ts=%s", ts)
})
}

m := schemaFeed{}
m.mu.highWater = ts(0)
Expand Down Expand Up @@ -93,6 +107,8 @@ func TestTableHistoryIngestionTracking(t *testing.T) {
errCh7 := make(chan error, 1)
go func() { errCh7 <- m.waitForTS(ctx, ts(7)) }()
go func() { errCh6 <- m.waitForTS(ctx, ts(6)) }()
requireWaitingFor(t, &m, ts(7))
requireWaitingFor(t, &m, ts(6))
requireChannelEmpty(t, errCh6)
requireChannelEmpty(t, errCh7)

Expand All @@ -105,6 +121,7 @@ func TestTableHistoryIngestionTracking(t *testing.T) {
require.NoError(t, m.ingestDescriptors(ctx, ts(5), ts(6), nil, validateFn))
require.NoError(t, <-errCh6)
requireChannelEmpty(t, errCh7)
requireWaitingFor(t, &m, ts(7))

// high-water advances again, unblocks errCh7
require.NoError(t, m.ingestDescriptors(ctx, ts(6), ts(7), nil, validateFn))
Expand All @@ -114,6 +131,7 @@ func TestTableHistoryIngestionTracking(t *testing.T) {
errCh8 := make(chan error, 1)
ctxTS8, cancelTS8 := context.WithCancel(ctx)
go func() { errCh8 <- m.waitForTS(ctxTS8, ts(8)) }()
requireWaitingFor(t, &m, ts(8))
requireChannelEmpty(t, errCh8)
cancelTS8()
require.EqualError(t, <-errCh8, `context canceled`)
Expand All @@ -132,6 +150,8 @@ func TestTableHistoryIngestionTracking(t *testing.T) {
errCh9 := make(chan error, 1)
go func() { errCh8 <- m.waitForTS(ctx, ts(8)) }()
go func() { errCh9 <- m.waitForTS(ctx, ts(9)) }()
requireWaitingFor(t, &m, ts(8))
requireWaitingFor(t, &m, ts(9))
requireChannelEmpty(t, errCh8)
requireChannelEmpty(t, errCh9)

Expand All @@ -143,9 +163,10 @@ func TestTableHistoryIngestionTracking(t *testing.T) {
require.EqualError(t, <-errCh9, `descriptor: oh no!`)

// ts 8 is still unknown
requireWaitingFor(t, &m, ts(8))
requireChannelEmpty(t, errCh8)

// always return the earlist error seen (so waiting for ts 10 immediately
// always return the earliest error seen (so waiting for ts 10 immediately
// returns the 9 error now, it returned the ts 10 error above)
require.EqualError(t, m.waitForTS(ctx, ts(9)), `descriptor: oh no!`)

Expand Down

0 comments on commit da4a3b1

Please sign in to comment.