From 336c02939cb22efade41c04ad3cdfe7a8b76c101 Mon Sep 17 00:00:00 2001 From: Daniel Harrison Date: Tue, 16 Apr 2019 15:22:47 -0700 Subject: [PATCH] changefeedccl: deflake TestChangefeedDataTTL/sinkless This test is yet another instance of too prescriptive an assumption about how many duplicate we will (or won't get). Make it more resilient to them. Closes #36369 Release note: None --- pkg/ccl/changefeedccl/cdctest/testfeed.go | 7 +++++++ pkg/ccl/changefeedccl/changefeed_test.go | 21 ++++++++++++++------- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/pkg/ccl/changefeedccl/cdctest/testfeed.go b/pkg/ccl/changefeedccl/cdctest/testfeed.go index 08a4fd19485f..cc3e696e41e3 100644 --- a/pkg/ccl/changefeedccl/cdctest/testfeed.go +++ b/pkg/ccl/changefeedccl/cdctest/testfeed.go @@ -50,6 +50,13 @@ type TestFeedMessage struct { Resolved []byte } +func (m TestFeedMessage) String() string { + if m.Resolved != nil { + return string(m.Resolved) + } + return fmt.Sprintf(`%s: %s->%s`, m.Topic, m.Key, m.Value) +} + // TestFeed abstracts over reading from the various types of changefeed sinks. type TestFeed interface { // Partitions returns the domain of values that may be returned as a partition diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 8a7f0989a4f7..c494112cd49b 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -36,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/pkg/errors" @@ -1173,6 +1174,7 @@ func TestChangefeedDataTTL(t *testing.T) { defer leaktest.AfterTest(t)() testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) { + ctx := context.Background() // Set a very simple channel-based, wait-and-resume function as the // BeforeEmitRow hook. var shouldWait int32 @@ -1227,13 +1229,18 @@ func TestChangefeedDataTTL(t *testing.T) { atomic.StoreInt32(&shouldWait, 0) resume <- struct{}{} - // Verify that the third call to Next() returns an error (the first is the - // initial row, the second is the first change. The third should detect the - // GC interval mismatch). - _, _ = dataExpiredRows.Next() - _, _ = dataExpiredRows.Next() - if _, err := dataExpiredRows.Next(); !testutils.IsError(err, `must be after replica GC threshold`) { - t.Errorf(`expected "must be after replica GC threshold" error got: %+v`, err) + // Verify that, at some point, Next() returns a "must be after replica GC + // threshold" error. In the common case, that'll be the third call, but + // various conditions will cause RangeFeed to emit duplicates and so it may + // be a few more. + for { + msg, err := dataExpiredRows.Next() + if testutils.IsError(err, `must be after replica GC threshold`) { + break + } + if msg != nil { + log.Infof(ctx, "ignoring message %s", msg) + } } }