From 2355d4de6f68d996f13d36c9859e0ae655682d85 Mon Sep 17 00:00:00 2001 From: Ganeshprasad Rajashekhar Biradar Date: Fri, 16 Sep 2022 11:44:02 -0500 Subject: [PATCH] cdc: test negative timestamp value for cursor The negative timestamp value is calculated as the difference between a predefined time (this is the time from which we expect changefeed to run) and the current statement time. knobs is used to get the current statement time because it will only be available once the change feed statement starts executing. Resolves: #82350 Release note: None --- pkg/ccl/changefeedccl/changefeed_stmt.go | 4 ++++ pkg/ccl/changefeedccl/changefeed_test.go | 27 ++++++++++++++++++++++++ pkg/ccl/changefeedccl/testing_knobs.go | 7 ++++++ 3 files changed, 38 insertions(+) diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index 50bbfb142b17..e6fb4ed522fd 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -328,6 +328,10 @@ func createChangefeedJobRecord( } var initialHighWater hlc.Timestamp evalTimestamp := func(s string) (hlc.Timestamp, error) { + knobs := p.ExecCfg().DistSQLSrv.TestingKnobs.Changefeed.(*TestingKnobs) + if knobs.OverrideCursor != nil { + s = knobs.OverrideCursor(&statementTime) + } asOfClause := tree.AsOfClause{Expr: tree.NewStrVal(s)} asOf, err := p.EvalAsOfTimestamp(ctx, asOfClause) if err != nil { diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 7cab86b5261b..a2d1194916b9 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -666,6 +666,7 @@ func TestChangefeedCursor(t *testing.T) { // 'after', throw a couple sleeps around them. We round timestamps to // Microsecond granularity for Postgres compatibility, so make the // sleeps 10x that. + beforeInsert := s.Server.Clock().Now() sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'before')`) time.Sleep(10 * time.Microsecond) @@ -677,6 +678,32 @@ func TestChangefeedCursor(t *testing.T) { time.Sleep(10 * time.Microsecond) sqlDB.Exec(t, `INSERT INTO foo VALUES (2, 'after')`) + // The below function is currently used to test negative timestamp in cursor i.e of the form + // "-3us". + // Using this function we can calculate the difference with the time that was before + // the insert statement, which is set as the new cursor value inside createChangefeedJobRecord + calculateCursor := func(currentTime *hlc.Timestamp) string { + // Should convert to microseconds as that is the maximum precision we support + diff := (beforeInsert.WallTime - currentTime.WallTime) / 1000 + diffStr := strconv.FormatInt(diff, 10) + "us" + return diffStr + } + + knobs := s.TestingKnobs.DistSQL.(*execinfra.TestingKnobs).Changefeed.(*TestingKnobs) + knobs.OverrideCursor = calculateCursor + + // The "-3 days" is a placeholder here - it will be replaced with actual difference + // in createChangefeedJobRecord + fooInterval := feed(t, f, `CREATE CHANGEFEED FOR foo WITH cursor=$1`, "-3 days") + defer closeFeed(t, fooInterval) + assertPayloads(t, fooInterval, []string{ + `foo: [1]->{"after": {"a": 1, "b": "before"}}`, + `foo: [2]->{"after": {"a": 2, "b": "after"}}`, + }) + + // We do not need to override for the remaining cases + knobs.OverrideCursor = nil + fooLogical := feed(t, f, `CREATE CHANGEFEED FOR foo WITH cursor=$1`, tsLogical) defer closeFeed(t, fooLogical) assertPayloads(t, fooLogical, []string{ diff --git a/pkg/ccl/changefeedccl/testing_knobs.go b/pkg/ccl/changefeedccl/testing_knobs.go index f7a993da2340..5735189ed9fb 100644 --- a/pkg/ccl/changefeedccl/testing_knobs.go +++ b/pkg/ccl/changefeedccl/testing_knobs.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/mon" ) @@ -47,6 +48,12 @@ type TestingKnobs struct { ShouldReplan func(ctx context.Context, oldPlan, newPlan *sql.PhysicalPlan) bool // RaiseRetryableError is a knob used to possibly return an error. RaiseRetryableError func() error + + // This is currently used to test negative timestamp in cursor i.e of the form + // "-3us". Check TestChangefeedCursor for more info. This function needs to be in the + // knobs as current statement time will only be available once the create changefeed statement + // starts executing. + OverrideCursor func(currentTime *hlc.Timestamp) string } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.