From 176bf031d1a9b8bd50c6bfefd19be3776da88da7 Mon Sep 17 00:00:00 2001 From: Ganeshprasad Rajashekhar Biradar Date: Fri, 16 Sep 2022 11:44:02 -0500 Subject: [PATCH] cdc: test negative timestamp value for cursor The negative timestamp value is calculated as the difference between a predefined time (this is the time from which we expect changefeed to run) and the current statement time. knobs is used to get the current statement time because it will only be available once the change feed statement starts executing. Resolves: #82350 Release note: None --- pkg/ccl/changefeedccl/changefeed_stmt.go | 4 ++++ pkg/ccl/changefeedccl/changefeed_test.go | 27 ++++++++++++++++++++++++ pkg/ccl/changefeedccl/testing_knobs.go | 7 ++++++ 3 files changed, 38 insertions(+) diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index a3519ae781d1..83a9a41b6844 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -328,6 +328,10 @@ func createChangefeedJobRecord( } var initialHighWater hlc.Timestamp evalTimestamp := func(s string) (hlc.Timestamp, error) { + knobs := p.ExecCfg().DistSQLSrv.TestingKnobs.Changefeed.(*TestingKnobs) + if knobs.OverrideCursor != nil { + s = knobs.OverrideCursor(&statementTime) + } asOfClause := tree.AsOfClause{Expr: tree.NewStrVal(s)} asOf, err := p.EvalAsOfTimestamp(ctx, asOfClause) if err != nil { diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 6d0fea2d18c0..51754479e435 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -666,6 +666,7 @@ func TestChangefeedCursor(t *testing.T) { // 'after', throw a couple sleeps around them. We round timestamps to // Microsecond granularity for Postgres compatibility, so make the // sleeps 10x that. + beforeInsert := s.Server.Clock().Now() sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'before')`) time.Sleep(10 * time.Microsecond) @@ -677,6 +678,32 @@ func TestChangefeedCursor(t *testing.T) { time.Sleep(10 * time.Microsecond) sqlDB.Exec(t, `INSERT INTO foo VALUES (2, 'after')`) + // The below function is currently used to test negative timestamp in cursor i.e of the form + // "-3us". + // Using this function we can calculate the difference with the time that was before + // the insert statement, which is set as the new cursor value inside createChangefeedJobRecord + calculateCursor := func(currentTime *hlc.Timestamp) string { + // Should convert to microseconds as that is the maximum precision we support + diff := (beforeInsert.WallTime - currentTime.WallTime) / 1000 + diffStr := strconv.FormatInt(diff, 10) + "us" + return diffStr + } + + knobs := s.TestingKnobs.DistSQL.(*execinfra.TestingKnobs).Changefeed.(*TestingKnobs) + knobs.OverrideCursor = calculateCursor + + // The "-3 days" is a placeholder here - it will be replaced with actual difference + // in createChangefeedJobRecord + fooInterval := feed(t, f, `CREATE CHANGEFEED FOR foo WITH cursor=$1`, "-3 days") + defer closeFeed(t, fooInterval) + assertPayloads(t, fooInterval, []string{ + `foo: [1]->{"after": {"a": 1, "b": "before"}}`, + `foo: [2]->{"after": {"a": 2, "b": "after"}}`, + }) + + // We do not need to override for the remaining cases + knobs.OverrideCursor = nil + fooLogical := feed(t, f, `CREATE CHANGEFEED FOR foo WITH cursor=$1`, tsLogical) defer closeFeed(t, fooLogical) assertPayloads(t, fooLogical, []string{ diff --git a/pkg/ccl/changefeedccl/testing_knobs.go b/pkg/ccl/changefeedccl/testing_knobs.go index f7a993da2340..5735189ed9fb 100644 --- a/pkg/ccl/changefeedccl/testing_knobs.go +++ b/pkg/ccl/changefeedccl/testing_knobs.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/mon" ) @@ -47,6 +48,12 @@ type TestingKnobs struct { ShouldReplan func(ctx context.Context, oldPlan, newPlan *sql.PhysicalPlan) bool // RaiseRetryableError is a knob used to possibly return an error. RaiseRetryableError func() error + + // This is currently used to test negative timestamp in cursor i.e of the form + // "-3us". Check TestChangefeedCursor for more info. This function needs to be in the + // knobs as current statement time will only be available once the create changefeed statement + // starts executing. + OverrideCursor func(currentTime *hlc.Timestamp) string } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.