diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index 50bbfb142b17..e6fb4ed522fd 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -328,6 +328,10 @@ func createChangefeedJobRecord( } var initialHighWater hlc.Timestamp evalTimestamp := func(s string) (hlc.Timestamp, error) { + knobs := p.ExecCfg().DistSQLSrv.TestingKnobs.Changefeed.(*TestingKnobs) + if knobs.OverrideCursor != nil { + s = knobs.OverrideCursor(&statementTime) + } asOfClause := tree.AsOfClause{Expr: tree.NewStrVal(s)} asOf, err := p.EvalAsOfTimestamp(ctx, asOfClause) if err != nil { diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 7cab86b5261b..a2d1194916b9 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -666,6 +666,7 @@ func TestChangefeedCursor(t *testing.T) { // 'after', throw a couple sleeps around them. We round timestamps to // Microsecond granularity for Postgres compatibility, so make the // sleeps 10x that. + beforeInsert := s.Server.Clock().Now() sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'before')`) time.Sleep(10 * time.Microsecond) @@ -677,6 +678,32 @@ func TestChangefeedCursor(t *testing.T) { time.Sleep(10 * time.Microsecond) sqlDB.Exec(t, `INSERT INTO foo VALUES (2, 'after')`) + // The below function is currently used to test negative timestamp in cursor i.e of the form + // "-3us". + // Using this function we can calculate the difference with the time that was before + // the insert statement, which is set as the new cursor value inside createChangefeedJobRecord + calculateCursor := func(currentTime *hlc.Timestamp) string { + // Should convert to microseconds as that is the maximum precision we support + diff := (beforeInsert.WallTime - currentTime.WallTime) / 1000 + diffStr := strconv.FormatInt(diff, 10) + "us" + return diffStr + } + + knobs := s.TestingKnobs.DistSQL.(*execinfra.TestingKnobs).Changefeed.(*TestingKnobs) + knobs.OverrideCursor = calculateCursor + + // The "-3 days" is a placeholder here - it will be replaced with actual difference + // in createChangefeedJobRecord + fooInterval := feed(t, f, `CREATE CHANGEFEED FOR foo WITH cursor=$1`, "-3 days") + defer closeFeed(t, fooInterval) + assertPayloads(t, fooInterval, []string{ + `foo: [1]->{"after": {"a": 1, "b": "before"}}`, + `foo: [2]->{"after": {"a": 2, "b": "after"}}`, + }) + + // We do not need to override for the remaining cases + knobs.OverrideCursor = nil + fooLogical := feed(t, f, `CREATE CHANGEFEED FOR foo WITH cursor=$1`, tsLogical) defer closeFeed(t, fooLogical) assertPayloads(t, fooLogical, []string{ diff --git a/pkg/ccl/changefeedccl/testing_knobs.go b/pkg/ccl/changefeedccl/testing_knobs.go index f7a993da2340..5735189ed9fb 100644 --- a/pkg/ccl/changefeedccl/testing_knobs.go +++ b/pkg/ccl/changefeedccl/testing_knobs.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/mon" ) @@ -47,6 +48,12 @@ type TestingKnobs struct { ShouldReplan func(ctx context.Context, oldPlan, newPlan *sql.PhysicalPlan) bool // RaiseRetryableError is a knob used to possibly return an error. RaiseRetryableError func() error + + // This is currently used to test negative timestamp in cursor i.e of the form + // "-3us". Check TestChangefeedCursor for more info. This function needs to be in the + // knobs as current statement time will only be available once the create changefeed statement + // starts executing. + OverrideCursor func(currentTime *hlc.Timestamp) string } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.