Skip to content

Commit

Permalink
cdc: test negative timestamp value for cursor
Browse files Browse the repository at this point in the history
The negative timestamp value is calculated as the difference between a
predefined time (this is the time from which we expect changefeed to run)
and the current statement time. knobs is used to get the current statement
time because it will only be available once the change feed statement
starts executing.

Resolves: #82350

Release note: None
  • Loading branch information
biradarganesh25 committed Sep 16, 2022
1 parent 95677eb commit 2355d4d
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 0 deletions.
4 changes: 4 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,10 @@ func createChangefeedJobRecord(
}
var initialHighWater hlc.Timestamp
evalTimestamp := func(s string) (hlc.Timestamp, error) {
knobs := p.ExecCfg().DistSQLSrv.TestingKnobs.Changefeed.(*TestingKnobs)
if knobs.OverrideCursor != nil {
s = knobs.OverrideCursor(&statementTime)
}
asOfClause := tree.AsOfClause{Expr: tree.NewStrVal(s)}
asOf, err := p.EvalAsOfTimestamp(ctx, asOfClause)
if err != nil {
Expand Down
27 changes: 27 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,7 @@ func TestChangefeedCursor(t *testing.T) {
// 'after', throw a couple sleeps around them. We round timestamps to
// Microsecond granularity for Postgres compatibility, so make the
// sleeps 10x that.
beforeInsert := s.Server.Clock().Now()
sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'before')`)
time.Sleep(10 * time.Microsecond)

Expand All @@ -677,6 +678,32 @@ func TestChangefeedCursor(t *testing.T) {
time.Sleep(10 * time.Microsecond)
sqlDB.Exec(t, `INSERT INTO foo VALUES (2, 'after')`)

// The below function is currently used to test negative timestamp in cursor i.e of the form
// "-3us".
// Using this function we can calculate the difference with the time that was before
// the insert statement, which is set as the new cursor value inside createChangefeedJobRecord
calculateCursor := func(currentTime *hlc.Timestamp) string {
// Should convert to microseconds as that is the maximum precision we support
diff := (beforeInsert.WallTime - currentTime.WallTime) / 1000
diffStr := strconv.FormatInt(diff, 10) + "us"
return diffStr
}

knobs := s.TestingKnobs.DistSQL.(*execinfra.TestingKnobs).Changefeed.(*TestingKnobs)
knobs.OverrideCursor = calculateCursor

// The "-3 days" is a placeholder here - it will be replaced with actual difference
// in createChangefeedJobRecord
fooInterval := feed(t, f, `CREATE CHANGEFEED FOR foo WITH cursor=$1`, "-3 days")
defer closeFeed(t, fooInterval)
assertPayloads(t, fooInterval, []string{
`foo: [1]->{"after": {"a": 1, "b": "before"}}`,
`foo: [2]->{"after": {"a": 2, "b": "after"}}`,
})

// We do not need to override for the remaining cases
knobs.OverrideCursor = nil

fooLogical := feed(t, f, `CREATE CHANGEFEED FOR foo WITH cursor=$1`, tsLogical)
defer closeFeed(t, fooLogical)
assertPayloads(t, fooLogical, []string{
Expand Down
7 changes: 7 additions & 0 deletions pkg/ccl/changefeedccl/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/mon"
)

Expand Down Expand Up @@ -47,6 +48,12 @@ type TestingKnobs struct {
ShouldReplan func(ctx context.Context, oldPlan, newPlan *sql.PhysicalPlan) bool
// RaiseRetryableError is a knob used to possibly return an error.
RaiseRetryableError func() error

// This is currently used to test negative timestamp in cursor i.e of the form
// "-3us". Check TestChangefeedCursor for more info. This function needs to be in the
// knobs as current statement time will only be available once the create changefeed statement
// starts executing.
OverrideCursor func(currentTime *hlc.Timestamp) string
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down

0 comments on commit 2355d4d

Please sign in to comment.