Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cdc: test negative timestamp cdc #88058

Merged
merged 1 commit into from
Sep 19, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
@@ -328,6 +328,11 @@ func createChangefeedJobRecord(
}
var initialHighWater hlc.Timestamp
evalTimestamp := func(s string) (hlc.Timestamp, error) {
if knobs, ok := p.ExecCfg().DistSQLSrv.TestingKnobs.Changefeed.(*TestingKnobs); ok {
if knobs != nil && knobs.OverrideCursor != nil {
s = knobs.OverrideCursor(&statementTime)
}
}
asOfClause := tree.AsOfClause{Expr: tree.NewStrVal(s)}
asOf, err := p.EvalAsOfTimestamp(ctx, asOfClause)
if err != nil {
27 changes: 27 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
@@ -666,6 +666,7 @@ func TestChangefeedCursor(t *testing.T) {
// 'after', throw a couple sleeps around them. We round timestamps to
// Microsecond granularity for Postgres compatibility, so make the
// sleeps 10x that.
beforeInsert := s.Server.Clock().Now()
sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'before')`)
time.Sleep(10 * time.Microsecond)

@@ -677,6 +678,32 @@ func TestChangefeedCursor(t *testing.T) {
time.Sleep(10 * time.Microsecond)
sqlDB.Exec(t, `INSERT INTO foo VALUES (2, 'after')`)

// The below function is currently used to test negative timestamp in cursor i.e of the form
// "-3us".
// Using this function we can calculate the difference with the time that was before
// the insert statement, which is set as the new cursor value inside createChangefeedJobRecord
calculateCursor := func(currentTime *hlc.Timestamp) string {
// Should convert to microseconds as that is the maximum precision we support
diff := (beforeInsert.WallTime - currentTime.WallTime) / 1000
diffStr := strconv.FormatInt(diff, 10) + "us"
return diffStr
}

knobs := s.TestingKnobs.DistSQL.(*execinfra.TestingKnobs).Changefeed.(*TestingKnobs)
knobs.OverrideCursor = calculateCursor

// The "-3 days" is a placeholder here - it will be replaced with actual difference
// in createChangefeedJobRecord
biradarganesh25 marked this conversation as resolved.
Show resolved Hide resolved
fooInterval := feed(t, f, `CREATE CHANGEFEED FOR foo WITH cursor=$1`, "-3 days")
defer closeFeed(t, fooInterval)
assertPayloads(t, fooInterval, []string{
`foo: [1]->{"after": {"a": 1, "b": "before"}}`,
`foo: [2]->{"after": {"a": 2, "b": "after"}}`,
})

// We do not need to override for the remaining cases
knobs.OverrideCursor = nil

fooLogical := feed(t, f, `CREATE CHANGEFEED FOR foo WITH cursor=$1`, tsLogical)
defer closeFeed(t, fooLogical)
assertPayloads(t, fooLogical, []string{
7 changes: 7 additions & 0 deletions pkg/ccl/changefeedccl/testing_knobs.go
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/mon"
)

@@ -47,6 +48,12 @@ type TestingKnobs struct {
ShouldReplan func(ctx context.Context, oldPlan, newPlan *sql.PhysicalPlan) bool
// RaiseRetryableError is a knob used to possibly return an error.
RaiseRetryableError func() error

// This is currently used to test negative timestamp in cursor i.e of the form
// "-3us". Check TestChangefeedCursor for more info. This function needs to be in the
// knobs as current statement time will only be available once the create changefeed statement
// starts executing.
OverrideCursor func(currentTime *hlc.Timestamp) string
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.