Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: unify initial_scan option syntax #79324

Merged
merged 1 commit into from
Apr 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 58 additions & 6 deletions pkg/ccl/changefeedccl/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package changefeedccl

import (
"context"
"strings"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
Expand All @@ -21,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)

const (
Expand Down Expand Up @@ -97,11 +99,61 @@ func makeSpansToProtect(
return spansToProtect
}

// initialScanFromOptions returns whether or not the options indicate the need
// for an initial scan on the first run.
func initialScanFromOptions(opts map[string]string) bool {
// initialScanTypeFromOpts determines the type of initial scan the changefeed
// should perform on the first run given the options provided from the user
func initialScanTypeFromOpts(opts map[string]string) (changefeedbase.InitialScanType, error) {
_, cursor := opts[changefeedbase.OptCursor]
_, initialScan := opts[changefeedbase.OptInitialScan]
_, noInitialScan := opts[changefeedbase.OptNoInitialScan]
return (cursor && initialScan) || (!cursor && !noInitialScan)
initialScanType, initialScanSet := opts[changefeedbase.OptInitialScan]
sherman-grewal marked this conversation as resolved.
Show resolved Hide resolved
_, initialScanOnlySet := opts[changefeedbase.OptInitialScanOnly]
_, noInitialScanSet := opts[changefeedbase.OptNoInitialScan]
sherman-grewal marked this conversation as resolved.
Show resolved Hide resolved

if initialScanSet && noInitialScanSet {
return changefeedbase.InitialScan, errors.Errorf(
`cannot specify both %s and %s`, changefeedbase.OptInitialScan,
changefeedbase.OptNoInitialScan)
}

if initialScanSet && initialScanOnlySet {
return changefeedbase.InitialScan, errors.Errorf(
`cannot specify both %s and %s`, changefeedbase.OptInitialScan,
changefeedbase.OptInitialScanOnly)
}

if noInitialScanSet && initialScanOnlySet {
return changefeedbase.InitialScan, errors.Errorf(
`cannot specify both %s and %s`, changefeedbase.OptInitialScanOnly,
changefeedbase.OptNoInitialScan)
}

if initialScanSet {
const opt = changefeedbase.OptInitialScan
switch strings.ToLower(initialScanType) {
case ``, `yes`:
return changefeedbase.InitialScan, nil
case `no`:
return changefeedbase.NoInitialScan, nil
case `only`:
return changefeedbase.OnlyInitialScan, nil
default:
return changefeedbase.InitialScan, errors.Errorf(
`unknown %s: %s`, opt, initialScanType)
}
}

if initialScanOnlySet {
return changefeedbase.OnlyInitialScan, nil
}

if noInitialScanSet {
return changefeedbase.NoInitialScan, nil
}

// If we reach this point, this implies that the user did not specify any initial scan
// options. In this case the default behaviour is to perform an initial scan if the
// cursor is not specified.
if !cursor {
return changefeedbase.InitialScan, nil
}

return changefeedbase.NoInitialScan, nil
}
7 changes: 6 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package changefeedccl
import (
"context"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeeddist"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
Expand Down Expand Up @@ -75,7 +76,11 @@ func distChangefeedFlow(
// We want to set the highWater and thus avoid an initial scan if either
// this is a cursor and there was no request for one, or we don't have a
// cursor but we have a request to not have an initial scan.
if noHighWater && !initialScanFromOptions(details.Opts) {
initialScanType, err := initialScanTypeFromOpts(details.Opts)
if err != nil {
return err
}
if noHighWater && initialScanType == changefeedbase.NoInitialScan {
// If there is a cursor, the statement time has already been set to it.
progress.Progress = &jobspb.Progress_HighWater{HighWater: &details.StatementTime}
}
Expand Down
40 changes: 20 additions & 20 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,11 @@ func changefeedPlanHook(
codec := p.ExecCfg().Codec

activeTimestampProtection := changefeedbase.ActiveProtectedTimestampsEnabled.Get(&p.ExecCfg().Settings.SV)
shouldProtectTimestamp := activeTimestampProtection || initialScanFromOptions(details.Opts)
initialScanType, err := initialScanTypeFromOpts(details.Opts)
if err != nil {
return err
}
shouldProtectTimestamp := activeTimestampProtection || (initialScanType != changefeedbase.NoInitialScan)
if shouldProtectTimestamp {
ptr = createProtectedTimestampRecord(ctx, codec, jobID, AllTargets(details), details.StatementTime, progress.GetChangefeed())
}
Expand Down Expand Up @@ -295,8 +299,14 @@ func createChangefeedJobRecord(
endTime = asOf.Timestamp
}

if _, ok := opts[changefeedbase.OptInitialScanOnly]; ok {
endTime = statementTime
{
initialScanType, err := initialScanTypeFromOpts(opts)
if err != nil {
return nil, err
}
if initialScanType == changefeedbase.OnlyInitialScan {
endTime = statementTime
}
}

targetList := uniqueTableNames(changefeedStmt.Targets)
Expand Down Expand Up @@ -762,15 +772,6 @@ func validateDetails(details jobspb.ChangefeedDetails) (jobspb.ChangefeedDetails
`unknown %s: %s`, opt, v)
}
}
{
_, withInitialScan := details.Opts[changefeedbase.OptInitialScan]
_, noInitialScan := details.Opts[changefeedbase.OptNoInitialScan]
if withInitialScan && noInitialScan {
return jobspb.ChangefeedDetails{}, errors.Errorf(
`cannot specify both %s and %s`, changefeedbase.OptInitialScan,
changefeedbase.OptNoInitialScan)
}
}
{
const opt = changefeedbase.OptEnvelope
switch v := changefeedbase.EnvelopeType(details.Opts[opt]); v {
Expand Down Expand Up @@ -824,21 +825,20 @@ func validateDetails(details jobspb.ChangefeedDetails) (jobspb.ChangefeedDetails
}
}
{
_, noInitialScan := details.Opts[changefeedbase.OptNoInitialScan]
initialScanType := details.Opts[changefeedbase.OptInitialScan]
_, onlyInitialScan := details.Opts[changefeedbase.OptInitialScanOnly]
_, endTime := details.Opts[changefeedbase.OptEndTime]
if onlyInitialScan && noInitialScan {
return jobspb.ChangefeedDetails{}, errors.Errorf(
`cannot specify both %s and %s`, changefeedbase.OptInitialScanOnly,
changefeedbase.OptNoInitialScan)
}
if endTime && onlyInitialScan {
return jobspb.ChangefeedDetails{}, errors.Errorf(
`cannot specify both %s and %s`, changefeedbase.OptInitialScanOnly,
changefeedbase.OptEndTime)
}
}
{

if strings.ToLower(initialScanType) == `only` && endTime {
return jobspb.ChangefeedDetails{}, errors.Errorf(
`cannot specify both %s='only' and %s`, changefeedbase.OptInitialScan, changefeedbase.OptEndTime)
}

if !details.EndTime.IsEmpty() && details.EndTime.Less(details.StatementTime) {
return jobspb.ChangefeedDetails{}, errors.Errorf(
`specified end time %s cannot be less than statement time %s`,
Expand Down
Loading