Skip to content

Commit

Permalink
Merge #80247 #83000
Browse files Browse the repository at this point in the history
80247: changefeedccl: refactor to stop passing around untyped string maps r=[miretskiy] a=HonoreDB

Why did I think this would be a quick win. This was slow and boring
and might be too big a diff to do in one go.

This PR makes the WITH options map into a typed object,
and moves a lot of the parsing and validation code into methods on
that object so that sinks, encoders, and so on can take small
typed objects instead of the whole giant untyped map. The only
time we still treat it as a giant map is input validation.

I've kept the parsing of JSON WITH options in the individual sinks
to go along with the 'quick and dirty add anything' vibe they have.

There's still work to do here: the protos used to communicate
state live outside of ccl, so we're still passing around the
untyped map a lot. And some more logic that could be moved into
options.go . But the diff is big enough already.

Release note: None

Closes #80105

83000: vendor: bump Pebble to c2c0273062ce r=nicktrav a=jbowens

```
c2c02730 db: fix SetOptions batch range{del,key} refresh logic
2aebabad sstable: remove obsolete linter ignore directive
dc391ef8 docs: use local scale by default on nightly benchmarks page
ef1ca573 compaction: elision-only compactions for tables with only range keys
4a952c0d db: document Batch as unsafe for concurrent access
9a8e4742 db: support SetOptions on iterators created through NewExternalIter
f13de498 db: fix nil-pointer in external iterator range key iteration
65702e71 db: avoid allocation in newIters
f185d7fa db: remove RangeKeysArena
ae99f4f1 *: start reading range keys from sstables
d72083df *: implement compaction/flushing of range keys
a015e5a0 db: exclude empty batch range deletion, range key iterators
8b7a68b8 compaction: introduce multi level compaction mechanics
67f2653a *: pass keyspan.Spans by pointer not value
83b45b0a internal/keyspan: refactor InterleavingIter bounds checking
c3053e0b internal/keyspan: manually inline {start,end}Bound in MergingIter
728722c5 internal/keyspan: refactor DefragmentingIter's invalid span checks
a5d4e00e db: add BenchmarkIteratorSeekNoRangeKeys
96dc71db internal/keyspan: preallocate merging iter levels, items
86dd6fd5 internal/keyspan: replace calls to Span.Visible
```

Release note: none

Co-authored-by: Aaron Zinger <[email protected]>
Co-authored-by: Jackson Owens <[email protected]>
  • Loading branch information
3 people committed Jun 16, 2022
3 parents 22a00bc + d622979 + 3d03247 commit 50c375e
Show file tree
Hide file tree
Showing 39 changed files with 1,245 additions and 634 deletions.
6 changes: 3 additions & 3 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -1347,10 +1347,10 @@ def go_deps():
patches = [
"@com_github_cockroachdb_cockroach//build/patches:com_github_cockroachdb_pebble.patch",
],
sha256 = "6fb7e9f191b1efde244b55dec4697e6128eb17186b3cc25b0eae2f196024004b",
strip_prefix = "github.com/cockroachdb/[email protected]20220603185428-ad44a62e4d04",
sha256 = "0a9c86f063e1e28409981a760e9167a8f24e47fcfbaa707d2eae291c3d3d01dc",
strip_prefix = "github.com/cockroachdb/[email protected]20220616170629-c2c0273062ce",
urls = [
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20220603185428-ad44a62e4d04.zip",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20220616170629-c2c0273062ce.zip",
],
)
go_repository(
Expand Down
2 changes: 1 addition & 1 deletion build/bazelutil/distdir_files.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ DISTDIR_FILES = {
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/go-test-teamcity/com_github_cockroachdb_go_test_teamcity-v0.0.0-20191211140407-cff980ad0a55.zip": "bac30148e525b79d004da84d16453ddd2d5cd20528e9187f1d7dac708335674b",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/gostdlib/com_github_cockroachdb_gostdlib-v1.13.0.zip": "b3d43d8f95edf65f73a5348f29e1159823cac64b148f8d3bb48340bf55d70872",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/logtags/com_github_cockroachdb_logtags-v0.0.0-20211118104740-dabe8e521a4f.zip": "1972c3f171f118add3fd9e64bcea6cbb9959a3b7fa0ada308e8a7310813fea74",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20220603185428-ad44a62e4d04.zip": "6fb7e9f191b1efde244b55dec4697e6128eb17186b3cc25b0eae2f196024004b",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20220616170629-c2c0273062ce.zip": "0a9c86f063e1e28409981a760e9167a8f24e47fcfbaa707d2eae291c3d3d01dc",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/redact/com_github_cockroachdb_redact-v1.1.3.zip": "7778b1e4485e4f17f35e5e592d87eb99c29e173ac9507801d000ad76dd0c261e",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/returncheck/com_github_cockroachdb_returncheck-v0.0.0-20200612231554-92cdbca611dd.zip": "ce92ba4352deec995b1f2eecf16eba7f5d51f5aa245a1c362dfe24c83d31f82b",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/sentry-go/com_github_cockroachdb_sentry_go-v0.6.1-cockroachdb.2.zip": "fbb2207d02aecfdd411b1357efe1192dbb827959e36b7cab7491731ac55935c9",
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ require (
github.com/cockroachdb/go-test-teamcity v0.0.0-20191211140407-cff980ad0a55
github.com/cockroachdb/gostdlib v1.13.0
github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f
github.com/cockroachdb/pebble v0.0.0-20220603185428-ad44a62e4d04
github.com/cockroachdb/pebble v0.0.0-20220616170629-c2c0273062ce
github.com/cockroachdb/redact v1.1.3
github.com/cockroachdb/returncheck v0.0.0-20200612231554-92cdbca611dd
github.com/cockroachdb/stress v0.0.0-20220310203902-58fb4627376e
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -455,8 +455,8 @@ github.com/cockroachdb/gostdlib v1.13.0/go.mod h1:eXX95p9QDrYwJfJ6AgeN9QnRa/lqqi
github.com/cockroachdb/logtags v0.0.0-20190617123548-eb05cc24525f/go.mod h1:i/u985jwjWRlyHXQbwatDASoW0RMlZ/3i9yJHE2xLkI=
github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f h1:6jduT9Hfc0njg5jJ1DdKCFPdMBrp/mdZfCpa5h+WM74=
github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs=
github.com/cockroachdb/pebble v0.0.0-20220603185428-ad44a62e4d04 h1:8xmS8ngzwmPW3+/wAYZXkShMAhwn/8vkXASCcGSc3s4=
github.com/cockroachdb/pebble v0.0.0-20220603185428-ad44a62e4d04/go.mod h1:pr479tNxFRmcfDyklTqoRMDDVmRlEbL+d7a7rhKnrI4=
github.com/cockroachdb/pebble v0.0.0-20220616170629-c2c0273062ce h1:/4U2MjzT+2oYtVH00PuCpZcdh2m5G/h3pK7xhQS4krk=
github.com/cockroachdb/pebble v0.0.0-20220616170629-c2c0273062ce/go.mod h1:pr479tNxFRmcfDyklTqoRMDDVmRlEbL+d7a7rhKnrI4=
github.com/cockroachdb/redact v1.0.8/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg=
github.com/cockroachdb/redact v1.1.3 h1:AKZds10rFSIj7qADf0g46UixK8NNLwWTNdCIGS5wfSQ=
github.com/cockroachdb/redact v1.1.3/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg=
Expand Down
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ ALL_TESTS = [
"//pkg/ccl/changefeedccl/cdcevent:cdcevent_test",
"//pkg/ccl/changefeedccl/cdctest:cdctest_test",
"//pkg/ccl/changefeedccl/cdcutils:cdcutils_test",
"//pkg/ccl/changefeedccl/changefeedbase:changefeedbase_test",
"//pkg/ccl/changefeedccl/kvevent:kvevent_test",
"//pkg/ccl/changefeedccl/kvfeed:kvfeed_test",
"//pkg/ccl/changefeedccl/schemafeed:schemafeed_test",
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ go_library(
"//pkg/ccl/changefeedccl/cdcutils",
"//pkg/ccl/changefeedccl/changefeedbase",
"//pkg/ccl/changefeedccl/changefeeddist",
"//pkg/ccl/changefeedccl/changefeedvalidators",
"//pkg/ccl/changefeedccl/kvevent",
"//pkg/ccl/changefeedccl/kvfeed",
"//pkg/ccl/changefeedccl/schemafeed",
Expand Down
32 changes: 17 additions & 15 deletions pkg/ccl/changefeedccl/alter_changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupresolver"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedvalidators"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
Expand Down Expand Up @@ -95,7 +96,7 @@ func alterChangefeedPlanHook(
newTargets, newProgress, newStatementTime, originalSpecs, err := generateNewTargets(ctx,
p,
alterChangefeedStmt.Cmds,
newOptions,
newOptions.AsMap(), // TODO: Remove .AsMap()
prevDetails,
job.Progress(),
)
Expand All @@ -104,7 +105,7 @@ func alterChangefeedPlanHook(
}
newChangefeedStmt.Targets = newTargets

for key, value := range newOptions {
for key, value := range newOptions.AsMap() {
opt := tree.KVOption{Key: tree.Name(key)}
if len(value) > 0 {
opt.Value = tree.NewDString(value)
Expand Down Expand Up @@ -209,40 +210,41 @@ func generateNewOpts(
alterCmds tree.AlterChangefeedCmds,
prevOpts map[string]string,
prevSinkURI string,
) (map[string]string, string, error) {
) (changefeedbase.StatementOptions, string, error) {
sinkURI := prevSinkURI
newOptions := prevOpts
null := changefeedbase.StatementOptions{}

for _, cmd := range alterCmds {
switch v := cmd.(type) {
case *tree.AlterChangefeedSetOptions:
optsFn, err := p.TypeAsStringOpts(ctx, v.Options, changefeedbase.AlterChangefeedOptionExpectValues)
optsFn, err := p.TypeAsStringOpts(ctx, v.Options, changefeedvalidators.AlterOptionValidations)
if err != nil {
return nil, ``, err
return null, ``, err
}

opts, err := optsFn()
if err != nil {
return nil, ``, err
return null, ``, err
}

for key, value := range opts {
if _, ok := changefeedbase.AlterChangefeedUnsupportedOptions[key]; ok {
return nil, ``, pgerror.Newf(pgcode.InvalidParameterValue, `cannot alter option %q`, key)
return null, ``, pgerror.Newf(pgcode.InvalidParameterValue, `cannot alter option %q`, key)
}
if key == changefeedbase.OptSink {
newSinkURI, err := url.Parse(value)
if err != nil {
return nil, ``, err
return null, ``, err
}

prevSinkURI, err := url.Parse(sinkURI)
if err != nil {
return nil, ``, err
return null, ``, err
}

if newSinkURI.Scheme != prevSinkURI.Scheme {
return nil, ``, pgerror.Newf(
return null, ``, pgerror.Newf(
pgcode.InvalidParameterValue,
`New sink type %q does not match original sink type %q. `+
`Altering the sink type of a changefeed is disallowed, consider creating a new changefeed instead.`,
Expand All @@ -261,21 +263,21 @@ func generateNewOpts(
optKeys := v.Options.ToStrings()
for _, key := range optKeys {
if key == changefeedbase.OptSink {
return nil, ``, pgerror.Newf(pgcode.InvalidParameterValue, `cannot unset option %q`, key)
return null, ``, pgerror.Newf(pgcode.InvalidParameterValue, `cannot unset option %q`, key)
}
if _, ok := changefeedbase.ChangefeedOptionExpectValues[key]; !ok {
return nil, ``, pgerror.Newf(pgcode.InvalidParameterValue, `invalid option %q`, key)
return null, ``, pgerror.Newf(pgcode.InvalidParameterValue, `invalid option %q`, key)
}
if _, ok := changefeedbase.AlterChangefeedUnsupportedOptions[key]; ok {
return nil, ``, pgerror.Newf(pgcode.InvalidParameterValue, `cannot alter option %q`, key)
return null, ``, pgerror.Newf(pgcode.InvalidParameterValue, `cannot alter option %q`, key)
}
delete(newOptions, key)
}
telemetry.CountBucketed(telemetryPath+`.unset_options`, int64(len(optKeys)))
}
}

return newOptions, sinkURI, nil
return changefeedbase.MakeStatementOptions(newOptions), sinkURI, nil
}

func generateNewTargets(
Expand Down Expand Up @@ -368,7 +370,7 @@ func generateNewTargets(
for _, cmd := range alterCmds {
switch v := cmd.(type) {
case *tree.AlterChangefeedAddTarget:
targetOptsFn, err := p.TypeAsStringOpts(ctx, v.Options, changefeedbase.AlterChangefeedTargetOptions)
targetOptsFn, err := p.TypeAsStringOpts(ctx, v.Options, changefeedvalidators.AlterTargetOptionValidations)
if err != nil {
return nil, nil, hlc.Timestamp{}, nil, err
}
Expand Down
18 changes: 9 additions & 9 deletions pkg/ccl/changefeedccl/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -774,15 +774,6 @@ func tableToAvroSchema(
return newSchemaForRow(row.ForEachColumn(), sqlName, namespace)
}

// textualFromRow encodes the given row data into avro's defined JSON format.
func (r *avroDataRecord) textualFromRow(row cdcevent.Row) ([]byte, error) {
native, err := r.nativeFromRow(row.ForEachColumn())
if err != nil {
return nil, err
}
return r.codec.TextualFromNative(nil /* buf */, native)
}

// BinaryFromRow encodes the given row data into avro's defined binary format.
func (r *avroDataRecord) BinaryFromRow(buf []byte, it cdcevent.Iterator) ([]byte, error) {
native, err := r.nativeFromRow(it)
Expand All @@ -804,6 +795,15 @@ func (r *avroDataRecord) rowFromTextual(buf []byte) (rowenc.EncDatumRow, error)
return r.rowFromNative(native)
}

// textualFromRow encodes the given row data into avro's defined JSON format.
func (r *avroDataRecord) textualFromRow(row cdcevent.Row) ([]byte, error) {
native, err := r.nativeFromRow(row.ForEachColumn())
if err != nil {
return nil, err
}
return r.codec.TextualFromNative(nil /* buf */, native)
}

// RowFromBinary decodes the given row data from avro's defined binary format.
func (r *avroDataRecord) RowFromBinary(buf []byte) (rowenc.EncDatumRow, error) {
native, newBuf, err := r.codec.NativeFromBinary(buf)
Expand Down
5 changes: 2 additions & 3 deletions pkg/ccl/changefeedccl/avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

"github.com/cockroachdb/apd/v3"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -166,7 +165,7 @@ func parseAvroSchema(t *testing.T, j string) (*avroDataRecord, error) {
return tableToAvroSchema(
cdcevent.TestingMakeEventRow(
tabledesc.NewBuilder(&tableDesc).BuildImmutableTable(), 0, nil, false,
), "", string(changefeedbase.OptVirtualColumnsOmitted))
), "", "")
}

func avroFieldMetadataToColDesc(metadata string) (*descpb.ColumnDescriptor, error) {
Expand Down Expand Up @@ -664,13 +663,13 @@ func TestAvroSchema(t *testing.T) {
schema, err := tableToAvroSchema(
row, avroSchemaNoSuffix, "")
require.NoError(t, err)
textual, err := schema.textualFromRow(row)
if test.numRawBytes > 0 {
overhead := 4
binary, err := schema.BinaryFromRow(make([]byte, 0, test.numRawBytes+20), row.ForEachColumn())
require.NoError(t, err)
require.Equal(t, test.numRawBytes, len(binary)-overhead)
}
textual, err := schema.textualFromRow(row)
require.NoError(t, err)
// Trim the outermost {}.
value := string(textual[1 : len(textual)-1])
Expand Down
13 changes: 4 additions & 9 deletions pkg/ccl/changefeedccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,15 +205,11 @@ func createBenchmarkChangefeed(
tableDesc := desctestutils.TestingGetPublicTableDescriptor(s.DB(), keys.SystemSQLCodec, database, table)
spans := []roachpb.Span{tableDesc.PrimaryIndexSpan(keys.SystemSQLCodec)}
details := jobspb.ChangefeedDetails{
Tables: jobspb.ChangefeedTargets{tableDesc.GetID(): jobspb.ChangefeedTargetTable{
StatementTimeName: tableDesc.GetName(),
}},
Opts: map[string]string{
changefeedbase.OptEnvelope: string(changefeedbase.OptEnvelopeRow),
},
Tables: jobspb.ChangefeedTargets{tableDesc.GetID(): jobspb.ChangefeedTargetTable{StatementTimeName: tableDesc.GetName()}},
}
initialHighWater := hlc.Timestamp{}
encoder, err := makeJSONEncoder(details.Opts, AllTargets(details))
encodingOpts := changefeedbase.EncodingOptions{Format: changefeedbase.OptFormatJSON, Envelope: changefeedbase.OptEnvelopeRow}
encoder, err := makeJSONEncoder(encodingOpts, AllTargets(details))
if err != nil {
return nil, nil, err
}
Expand All @@ -230,7 +226,6 @@ func createBenchmarkChangefeed(
if needsInitialScan {
initialHighWater = details.StatementTime
}
_, withDiff := details.Opts[changefeedbase.OptDiff]
kvfeedCfg := kvfeed.Config{
Settings: settings,
DB: s.DB(),
Expand All @@ -242,7 +237,7 @@ func createBenchmarkChangefeed(
Metrics: &metrics.KVFeedMetrics,
MM: mm,
InitialHighWater: initialHighWater,
WithDiff: withDiff,
WithDiff: false,
NeedsInitialScan: needsInitialScan,
SchemaFeed: schemafeed.DoNothingSchemaFeed,
}
Expand Down
62 changes: 0 additions & 62 deletions pkg/ccl/changefeedccl/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ package changefeedccl

import (
"context"
"strings"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts"
"github.com/cockroachdb/cockroach/pkg/keys"
Expand All @@ -22,7 +20,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)

const (
Expand Down Expand Up @@ -98,62 +95,3 @@ func makeSpansToProtect(
addTablePrefix(keys.DescriptorTableID)
return spansToProtect
}

// initialScanTypeFromOpts determines the type of initial scan the changefeed
// should perform on the first run given the options provided from the user
func initialScanTypeFromOpts(opts map[string]string) (changefeedbase.InitialScanType, error) {
_, cursor := opts[changefeedbase.OptCursor]
initialScanType, initialScanSet := opts[changefeedbase.OptInitialScan]
_, initialScanOnlySet := opts[changefeedbase.OptInitialScanOnly]
_, noInitialScanSet := opts[changefeedbase.OptNoInitialScan]

if initialScanSet && noInitialScanSet {
return changefeedbase.InitialScan, errors.Errorf(
`cannot specify both %s and %s`, changefeedbase.OptInitialScan,
changefeedbase.OptNoInitialScan)
}

if initialScanSet && initialScanOnlySet {
return changefeedbase.InitialScan, errors.Errorf(
`cannot specify both %s and %s`, changefeedbase.OptInitialScan,
changefeedbase.OptInitialScanOnly)
}

if noInitialScanSet && initialScanOnlySet {
return changefeedbase.InitialScan, errors.Errorf(
`cannot specify both %s and %s`, changefeedbase.OptInitialScanOnly,
changefeedbase.OptNoInitialScan)
}

if initialScanSet {
const opt = changefeedbase.OptInitialScan
switch strings.ToLower(initialScanType) {
case ``, `yes`:
return changefeedbase.InitialScan, nil
case `no`:
return changefeedbase.NoInitialScan, nil
case `only`:
return changefeedbase.OnlyInitialScan, nil
default:
return changefeedbase.InitialScan, errors.Errorf(
`unknown %s: %s`, opt, initialScanType)
}
}

if initialScanOnlySet {
return changefeedbase.OnlyInitialScan, nil
}

if noInitialScanSet {
return changefeedbase.NoInitialScan, nil
}

// If we reach this point, this implies that the user did not specify any initial scan
// options. In this case the default behaviour is to perform an initial scan if the
// cursor is not specified.
if !cursor {
return changefeedbase.InitialScan, nil
}

return changefeedbase.NoInitialScan, nil
}
15 changes: 7 additions & 8 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,8 @@ func distChangefeedFlow(
progress jobspb.Progress,
resultsCh chan<- tree.Datums,
) error {
var err error
details, err = validateDetails(details)
if err != nil {
return err
}

opts := changefeedbase.MakeStatementOptions(details.Opts)

// NB: A non-empty high water indicates that we have checkpointed a resolved
// timestamp. Skipping the initial scan is equivalent to starting the
Expand All @@ -80,7 +77,7 @@ func distChangefeedFlow(
// We want to set the highWater and thus avoid an initial scan if either
// this is a cursor and there was no request for one, or we don't have a
// cursor but we have a request to not have an initial scan.
initialScanType, err := initialScanTypeFromOpts(details.Opts)
initialScanType, err := opts.GetInitialScanType()
if err != nil {
return err
}
Expand Down Expand Up @@ -120,14 +117,16 @@ func distChangefeedFlow(
return err
}

if filterExpr, isSet := details.Opts[changefeedbase.OptPrimaryKeyFilter]; isSet {
filters := opts.GetFilters()

if filters.WithPredicate {
if len(tableDescs) > 1 {
return pgerror.Newf(pgcode.InvalidParameterValue,
"option %s can only be used with 1 changefeed target (found %d)",
changefeedbase.OptPrimaryKeyFilter, len(tableDescs),
)
}
trackedSpans, err = constrainSpansByExpression(ctx, execCtx, filterExpr, tableDescs[0])
trackedSpans, err = constrainSpansByExpression(ctx, execCtx, filters.PrimaryKeyFilter, tableDescs[0])
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 50c375e

Please sign in to comment.