Skip to content

Commit

Permalink
changefeedccl: unskip TestChangefeedDataTTL
Browse files Browse the repository at this point in the history
Much has changed since this test was originally written and since it
was originally skipped.

Namely, moving to rangefeeds as the underlying driver of changefeeds
and the addition of protected timestamps during backfills.

When I uncomment the skip on master, we see

1) The `enterprise` test suite always timing out
2) The `sinkless` test suite almost always passing

While the sinkless test suite passed, it wasn't passing in the way
some of the comments in the test expected.

As written, I believe this test would observe the expected failure
only if the GC had occurred before the first call to dist.RangeFeed.

In the sinkless case, we were typically blocking the Emit of the first
message sent via the backfill. Since the backfill happens before the
Rangefeed is started, this meant that the first call to dist.RangeFeed
happened after the GC and thus the expected errors was returned when
we requested a rangefeed starting before the GC. From my reading of
this code, that was technically racy and wasn't guaranteed, but on my
laptop, the test always passed. Note that this doesn't match the
in-code comments which expected that the feed should see at least one
of the updates before failing.

In the enterprise case, we created a protected timestamp before
starting the backfill. This protected timestamp renders our attempt to
force the GC useless, meaning that when we start the RangeFeed, our
desired timestamp is still available and no error is emitted.

This PR

- Removes the enterprise test case

- Modifies the sinkless test case to more explicitly test the sequence
  of events it was testing before.

- Makes it a test failure if all row updates are seen by the test
feed, meaning we don't have to wait for a timeout to fail if this test
is still racy somehow.

Testing the right behaviour of rangefeeds when they fall behind the GC
TTL seems like a test that better belongs in the rangefeed code if we
need such a test.

Fixes #37154

Release note: None
  • Loading branch information
stevendanna committed May 25, 2021
1 parent 4ceda0b commit d32f641
Showing 1 changed file with 54 additions and 28 deletions.
82 changes: 54 additions & 28 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package changefeedccl
import (
"context"
gosql "database/sql"
"encoding/json"
"fmt"
"math"
"net/http"
Expand Down Expand Up @@ -2086,10 +2087,7 @@ func TestChangefeedDataTTL(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.WithIssue(t, 37154)

testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
ctx := context.Background()
// Set a very simple channel-based, wait-and-resume function as the
// BeforeEmitRow hook.
var shouldWait int32
Expand All @@ -2109,61 +2107,89 @@ func TestChangefeedDataTTL(t *testing.T) {

sqlDB := sqlutils.MakeSQLRunner(db)

// Create the data table; it will only contain a single row with multiple
// versions.
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
// Create the data table; it will only contain a
// single row with multiple versions.
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b INT)`)

counter := 0
upsertedValues := make(map[int]struct{})
upsertRow := func() {
counter++
sqlDB.Exec(t, `UPSERT INTO foo (a, b) VALUES (1, $1)`, fmt.Sprintf("version %d", counter))
sqlDB.Exec(t, `UPSERT INTO foo (a, b) VALUES (1, $1)`, counter)
upsertedValues[counter] = struct{}{}
}

// Create the initial version of the row and the changefeed itself. The initial
// version is necessary to prevent CREATE CHANGEFEED itself from hanging.
// Create the initial version of the row and the
// changefeed itself. The initial version is necessary
// to ensure that there is at least one row to
// backfill.
upsertRow()

// Set emit trap to ensure the backfill will pause.
// The backfill happens before the construction of the
// rangefeed. Further the backfill sends rows to the
// changeAggregator via an unbuffered channel, so
// blocking the emit should block the scan from
// finishing.
atomic.StoreInt32(&shouldWait, 1)

dataExpiredRows := feed(t, f, "CREATE CHANGEFEED FOR TABLE foo")
defer closeFeed(t, dataExpiredRows)

// Set up our emit trap and update the row, which will allow us to "pause" the
// changefeed in order to force a GC.
atomic.StoreInt32(&shouldWait, 1)
upsertRow()
// Ensure our changefeed is started and waiting during the backfill.
<-wait

// Upsert two additional versions. One of these will be deleted by the GC
// process before changefeed polling is resumed.
// Upsert additional versions. One of these will be
// deleted by the GC process before the rangefeed is
// started.
upsertRow()
upsertRow()
upsertRow()

// Force a GC of the table. This should cause both older versions of the
// table to be deleted, with the middle version being lost to the changefeed.
// Force a GC of the table. This should cause both
// versions of the table to be deleted.
forceTableGC(t, f.Server(), sqlDB, "d", "foo")

// Resume our changefeed normally.
atomic.StoreInt32(&shouldWait, 0)
resume <- struct{}{}

// Verify that, at some point, Next() returns a "must be after replica GC
// threshold" error. In the common case, that'll be the third call, but
// various conditions will cause RangeFeed to emit duplicates and so it may
// be a few more.
//
// TODO(tbg): this should keep track of the values seen and once we have
// observed all four (which should never happen), fail the test.
// Verify that, at some point, Next() returns a "must
// be after replica GC threshold" error. In the common
// case, that'll be the second call, the first will
// should return the row from the backfill and the
// second should be returning
for {
msg, err := dataExpiredRows.Next()
if testutils.IsError(err, `must be after replica GC threshold`) {
t.Logf("got expected GC error: %s", err)
break
}
if msg != nil {
log.Infof(ctx, "ignoring message %s", msg)
t.Logf("ignoring message: %s", msg)
var decodedMessage struct {
After struct {
A int
B int
}
}
err = json.Unmarshal(msg.Value, &decodedMessage)
require.NoError(t, err)
delete(upsertedValues, decodedMessage.After.B)
if len(upsertedValues) == 0 {
t.Error("TestFeed emitted all values despite GC running")
return
}
}
}
}

t.Run("sinkless", sinklessTest(testFn))
t.Run("enterprise", enterpriseTest(testFn))
// NOTE(ssd): This test doesn't apply to enterprise
// changefeeds since enterprise changefeeds create a protected
// timestamp before beginning their backfill.
//
// TODO(ssd): Tenant test disabled because this test requires
// the fully TestServerInterface.
t.Run("sinkless", sinklessTest(testFn, feedTestNoTenants))
}

// TestChangefeedSchemaTTL ensures that changefeeds fail with an error in the case
Expand Down

0 comments on commit d32f641

Please sign in to comment.