From d32f64124b4f70b349423d2afef8a5358e38aefe Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Tue, 25 May 2021 11:41:49 +0100 Subject: [PATCH] changefeedccl: unskip TestChangefeedDataTTL Much has changed since this test was originally written and since it was originally skipped. Namely, moving to rangefeeds as the underlying driver of changefeeds and the addition of protected timestamps during backfills. When I uncomment the skip on master, we see 1) The `enterprise` test suite always timing out 2) The `sinkless` test suite almost always passing While the sinkless test suite passed, it wasn't passing in the way some of the comments in the test expected. As written, I believe this test would observe the expected failure only if the GC had occurred before the first call to dist.RangeFeed. In the sinkless case, we were typically blocking the Emit of the first message sent via the backfill. Since the backfill happens before the Rangefeed is started, this meant that the first call to dist.RangeFeed happened after the GC and thus the expected errors was returned when we requested a rangefeed starting before the GC. From my reading of this code, that was technically racy and wasn't guaranteed, but on my laptop, the test always passed. Note that this doesn't match the in-code comments which expected that the feed should see at least one of the updates before failing. In the enterprise case, we created a protected timestamp before starting the backfill. This protected timestamp renders our attempt to force the GC useless, meaning that when we start the RangeFeed, our desired timestamp is still available and no error is emitted. This PR - Removes the enterprise test case - Modifies the sinkless test case to more explicitly test the sequence of events it was testing before. - Makes it a test failure if all row updates are seen by the test feed, meaning we don't have to wait for a timeout to fail if this test is still racy somehow. Testing the right behaviour of rangefeeds when they fall behind the GC TTL seems like a test that better belongs in the rangefeed code if we need such a test. Fixes #37154 Release note: None --- pkg/ccl/changefeedccl/changefeed_test.go | 82 ++++++++++++++++-------- 1 file changed, 54 insertions(+), 28 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index d0d35f897380..23b502b8cf70 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -11,6 +11,7 @@ package changefeedccl import ( "context" gosql "database/sql" + "encoding/json" "fmt" "math" "net/http" @@ -2086,10 +2087,7 @@ func TestChangefeedDataTTL(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.WithIssue(t, 37154) - testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) { - ctx := context.Background() // Set a very simple channel-based, wait-and-resume function as the // BeforeEmitRow hook. var shouldWait int32 @@ -2109,61 +2107,89 @@ func TestChangefeedDataTTL(t *testing.T) { sqlDB := sqlutils.MakeSQLRunner(db) - // Create the data table; it will only contain a single row with multiple - // versions. - sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) + // Create the data table; it will only contain a + // single row with multiple versions. + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b INT)`) counter := 0 + upsertedValues := make(map[int]struct{}) upsertRow := func() { counter++ - sqlDB.Exec(t, `UPSERT INTO foo (a, b) VALUES (1, $1)`, fmt.Sprintf("version %d", counter)) + sqlDB.Exec(t, `UPSERT INTO foo (a, b) VALUES (1, $1)`, counter) + upsertedValues[counter] = struct{}{} } - // Create the initial version of the row and the changefeed itself. The initial - // version is necessary to prevent CREATE CHANGEFEED itself from hanging. + // Create the initial version of the row and the + // changefeed itself. The initial version is necessary + // to ensure that there is at least one row to + // backfill. upsertRow() + + // Set emit trap to ensure the backfill will pause. + // The backfill happens before the construction of the + // rangefeed. Further the backfill sends rows to the + // changeAggregator via an unbuffered channel, so + // blocking the emit should block the scan from + // finishing. + atomic.StoreInt32(&shouldWait, 1) + dataExpiredRows := feed(t, f, "CREATE CHANGEFEED FOR TABLE foo") defer closeFeed(t, dataExpiredRows) - // Set up our emit trap and update the row, which will allow us to "pause" the - // changefeed in order to force a GC. - atomic.StoreInt32(&shouldWait, 1) - upsertRow() + // Ensure our changefeed is started and waiting during the backfill. <-wait - // Upsert two additional versions. One of these will be deleted by the GC - // process before changefeed polling is resumed. + // Upsert additional versions. One of these will be + // deleted by the GC process before the rangefeed is + // started. + upsertRow() upsertRow() upsertRow() - // Force a GC of the table. This should cause both older versions of the - // table to be deleted, with the middle version being lost to the changefeed. + // Force a GC of the table. This should cause both + // versions of the table to be deleted. forceTableGC(t, f.Server(), sqlDB, "d", "foo") // Resume our changefeed normally. atomic.StoreInt32(&shouldWait, 0) resume <- struct{}{} - // Verify that, at some point, Next() returns a "must be after replica GC - // threshold" error. In the common case, that'll be the third call, but - // various conditions will cause RangeFeed to emit duplicates and so it may - // be a few more. - // - // TODO(tbg): this should keep track of the values seen and once we have - // observed all four (which should never happen), fail the test. + // Verify that, at some point, Next() returns a "must + // be after replica GC threshold" error. In the common + // case, that'll be the second call, the first will + // should return the row from the backfill and the + // second should be returning for { msg, err := dataExpiredRows.Next() if testutils.IsError(err, `must be after replica GC threshold`) { + t.Logf("got expected GC error: %s", err) break } if msg != nil { - log.Infof(ctx, "ignoring message %s", msg) + t.Logf("ignoring message: %s", msg) + var decodedMessage struct { + After struct { + A int + B int + } + } + err = json.Unmarshal(msg.Value, &decodedMessage) + require.NoError(t, err) + delete(upsertedValues, decodedMessage.After.B) + if len(upsertedValues) == 0 { + t.Error("TestFeed emitted all values despite GC running") + return + } } } } - - t.Run("sinkless", sinklessTest(testFn)) - t.Run("enterprise", enterpriseTest(testFn)) + // NOTE(ssd): This test doesn't apply to enterprise + // changefeeds since enterprise changefeeds create a protected + // timestamp before beginning their backfill. + // + // TODO(ssd): Tenant test disabled because this test requires + // the fully TestServerInterface. + t.Run("sinkless", sinklessTest(testFn, feedTestNoTenants)) } // TestChangefeedSchemaTTL ensures that changefeeds fail with an error in the case