Skip to content

Commit

Permalink
Merge #36891
Browse files Browse the repository at this point in the history
36891: changefeedccl: deflake TestChangefeedDataTTL/sinkless r=tbg a=danhhz

This test is yet another instance of too prescriptive an assumption
about how many duplicate we will (or won't) get. Make it more resilient
to them.

Closes #36369

Release note: None

Co-authored-by: Daniel Harrison <[email protected]>
  • Loading branch information
craig[bot] and danhhz committed Apr 17, 2019
2 parents 2e9325c + 336c029 commit 15799ff
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 7 deletions.
7 changes: 7 additions & 0 deletions pkg/ccl/changefeedccl/cdctest/testfeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ type TestFeedMessage struct {
Resolved []byte
}

func (m TestFeedMessage) String() string {
if m.Resolved != nil {
return string(m.Resolved)
}
return fmt.Sprintf(`%s: %s->%s`, m.Topic, m.Key, m.Value)
}

// TestFeed abstracts over reading from the various types of changefeed sinks.
type TestFeed interface {
// Partitions returns the domain of values that may be returned as a partition
Expand Down
21 changes: 14 additions & 7 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/pkg/errors"
Expand Down Expand Up @@ -1107,6 +1108,7 @@ func TestChangefeedDataTTL(t *testing.T) {
defer leaktest.AfterTest(t)()

testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
ctx := context.Background()
// Set a very simple channel-based, wait-and-resume function as the
// BeforeEmitRow hook.
var shouldWait int32
Expand Down Expand Up @@ -1161,13 +1163,18 @@ func TestChangefeedDataTTL(t *testing.T) {
atomic.StoreInt32(&shouldWait, 0)
resume <- struct{}{}

// Verify that the third call to Next() returns an error (the first is the
// initial row, the second is the first change. The third should detect the
// GC interval mismatch).
_, _ = dataExpiredRows.Next()
_, _ = dataExpiredRows.Next()
if _, err := dataExpiredRows.Next(); !testutils.IsError(err, `must be after replica GC threshold`) {
t.Errorf(`expected "must be after replica GC threshold" error got: %+v`, err)
// Verify that, at some point, Next() returns a "must be after replica GC
// threshold" error. In the common case, that'll be the third call, but
// various conditions will cause RangeFeed to emit duplicates and so it may
// be a few more.
for {
msg, err := dataExpiredRows.Next()
if testutils.IsError(err, `must be after replica GC threshold`) {
break
}
if msg != nil {
log.Infof(ctx, "ignoring message %s", msg)
}
}
}

Expand Down

0 comments on commit 15799ff

Please sign in to comment.