diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index d0d35f897380..23b502b8cf70 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -11,6 +11,7 @@ package changefeedccl import ( "context" gosql "database/sql" + "encoding/json" "fmt" "math" "net/http" @@ -2086,10 +2087,7 @@ func TestChangefeedDataTTL(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.WithIssue(t, 37154) - testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) { - ctx := context.Background() // Set a very simple channel-based, wait-and-resume function as the // BeforeEmitRow hook. var shouldWait int32 @@ -2109,61 +2107,89 @@ func TestChangefeedDataTTL(t *testing.T) { sqlDB := sqlutils.MakeSQLRunner(db) - // Create the data table; it will only contain a single row with multiple - // versions. - sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) + // Create the data table; it will only contain a + // single row with multiple versions. + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b INT)`) counter := 0 + upsertedValues := make(map[int]struct{}) upsertRow := func() { counter++ - sqlDB.Exec(t, `UPSERT INTO foo (a, b) VALUES (1, $1)`, fmt.Sprintf("version %d", counter)) + sqlDB.Exec(t, `UPSERT INTO foo (a, b) VALUES (1, $1)`, counter) + upsertedValues[counter] = struct{}{} } - // Create the initial version of the row and the changefeed itself. The initial - // version is necessary to prevent CREATE CHANGEFEED itself from hanging. + // Create the initial version of the row and the + // changefeed itself. The initial version is necessary + // to ensure that there is at least one row to + // backfill. upsertRow() + + // Set emit trap to ensure the backfill will pause. + // The backfill happens before the construction of the + // rangefeed. Further the backfill sends rows to the + // changeAggregator via an unbuffered channel, so + // blocking the emit should block the scan from + // finishing. + atomic.StoreInt32(&shouldWait, 1) + dataExpiredRows := feed(t, f, "CREATE CHANGEFEED FOR TABLE foo") defer closeFeed(t, dataExpiredRows) - // Set up our emit trap and update the row, which will allow us to "pause" the - // changefeed in order to force a GC. - atomic.StoreInt32(&shouldWait, 1) - upsertRow() + // Ensure our changefeed is started and waiting during the backfill. <-wait - // Upsert two additional versions. One of these will be deleted by the GC - // process before changefeed polling is resumed. + // Upsert additional versions. One of these will be + // deleted by the GC process before the rangefeed is + // started. + upsertRow() upsertRow() upsertRow() - // Force a GC of the table. This should cause both older versions of the - // table to be deleted, with the middle version being lost to the changefeed. + // Force a GC of the table. This should cause both + // versions of the table to be deleted. forceTableGC(t, f.Server(), sqlDB, "d", "foo") // Resume our changefeed normally. atomic.StoreInt32(&shouldWait, 0) resume <- struct{}{} - // Verify that, at some point, Next() returns a "must be after replica GC - // threshold" error. In the common case, that'll be the third call, but - // various conditions will cause RangeFeed to emit duplicates and so it may - // be a few more. - // - // TODO(tbg): this should keep track of the values seen and once we have - // observed all four (which should never happen), fail the test. + // Verify that, at some point, Next() returns a "must + // be after replica GC threshold" error. In the common + // case, that'll be the second call, the first will + // should return the row from the backfill and the + // second should be returning for { msg, err := dataExpiredRows.Next() if testutils.IsError(err, `must be after replica GC threshold`) { + t.Logf("got expected GC error: %s", err) break } if msg != nil { - log.Infof(ctx, "ignoring message %s", msg) + t.Logf("ignoring message: %s", msg) + var decodedMessage struct { + After struct { + A int + B int + } + } + err = json.Unmarshal(msg.Value, &decodedMessage) + require.NoError(t, err) + delete(upsertedValues, decodedMessage.After.B) + if len(upsertedValues) == 0 { + t.Error("TestFeed emitted all values despite GC running") + return + } } } } - - t.Run("sinkless", sinklessTest(testFn)) - t.Run("enterprise", enterpriseTest(testFn)) + // NOTE(ssd): This test doesn't apply to enterprise + // changefeeds since enterprise changefeeds create a protected + // timestamp before beginning their backfill. + // + // TODO(ssd): Tenant test disabled because this test requires + // the fully TestServerInterface. + t.Run("sinkless", sinklessTest(testFn, feedTestNoTenants)) } // TestChangefeedSchemaTTL ensures that changefeeds fail with an error in the case