diff --git a/docs/generated/sql/bnf/stmt_block.bnf b/docs/generated/sql/bnf/stmt_block.bnf index ba1b215c4f38..c38491dddcba 100644 --- a/docs/generated/sql/bnf/stmt_block.bnf +++ b/docs/generated/sql/bnf/stmt_block.bnf @@ -1258,6 +1258,7 @@ unreserved_keyword ::= | 'UNCOMMITTED' | 'UNKNOWN' | 'UNLOGGED' + | 'UNSET' | 'UNSPLIT' | 'UNTIL' | 'UPDATE' @@ -2473,6 +2474,8 @@ alter_default_privileges_target_object ::= alter_changefeed_cmd ::= 'ADD' changefeed_targets | 'DROP' changefeed_targets + | 'SET' kv_option_list + | 'UNSET' name_list alter_backup_cmd ::= 'ADD' backup_kms diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index c8db5f6ba83f..9f20d8696085 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -67,7 +67,6 @@ go_library( "//pkg/sql/execinfra", "//pkg/sql/execinfrapb", "//pkg/sql/flowinfra", - "//pkg/sql/parser", "//pkg/sql/pgwire/pgcode", "//pkg/sql/pgwire/pgerror", "//pkg/sql/pgwire/pgnotice", diff --git a/pkg/ccl/changefeedccl/alter_changefeed_stmt.go b/pkg/ccl/changefeedccl/alter_changefeed_stmt.go index 7baba9cd3605..55c540da9a1f 100644 --- a/pkg/ccl/changefeedccl/alter_changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/alter_changefeed_stmt.go @@ -11,15 +11,17 @@ package changefeedccl import ( "context" + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupresolver" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/sql" - "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" - "github.com/cockroachdb/cockroach/pkg/sql/parser" - "github.com/cockroachdb/cockroach/pkg/sql/privilege" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/resolver" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -30,11 +32,6 @@ func init() { sql.AddPlanHook("alter changefeed", alterChangefeedPlanHook) } -type alterChangefeedOpts struct { - AddTargets []tree.TargetList - DropTargets []tree.TargetList -} - // alterChangefeedPlanHook implements sql.PlanHookFn. func alterChangefeedPlanHook( ctx context.Context, stmt tree.Statement, p sql.PlanHookState, @@ -67,7 +64,7 @@ func alterChangefeedPlanHook( return err } - details, ok := job.Details().(jobspb.ChangefeedDetails) + prevDetails, ok := job.Details().(jobspb.ChangefeedDetails) if !ok { return errors.Errorf(`job %d is not changefeed job`, jobID) } @@ -76,116 +73,188 @@ func alterChangefeedPlanHook( return errors.Errorf(`job %d is not paused`, jobID) } - var opts alterChangefeedOpts - for _, cmd := range alterChangefeedStmt.Cmds { - switch v := cmd.(type) { - case *tree.AlterChangefeedAddTarget: - opts.AddTargets = append(opts.AddTargets, v.Targets) - case *tree.AlterChangefeedDropTarget: - opts.DropTargets = append(opts.DropTargets, v.Targets) + // this CREATE CHANGEFEED node will be used to update the existing changefeed + newChangefeedStmt := &tree.CreateChangefeed{ + SinkURI: tree.NewDString(prevDetails.SinkURI), + } + + optionsMap := make(map[string]tree.KVOption, len(prevDetails.Opts)) + + // pull the options that are set for the existing changefeed + for key, value := range prevDetails.Opts { + // There are some options (e.g. topics) that we set during the creation of + // a changefeed, but we do not allow these options to be set by the user. + // Hence, we can not include these options in our new CREATE CHANGEFEED + // statement. + if _, ok := changefeedbase.ChangefeedOptionExpectValues[key]; !ok { + continue + } + existingOpt := tree.KVOption{Key: tree.Name(key)} + if len(value) > 0 { + existingOpt.Value = tree.NewDString(value) } + optionsMap[key] = existingOpt } - var initialHighWater hlc.Timestamp statementTime := hlc.Timestamp{ WallTime: p.ExtendedEvalContext().GetStmtTimestamp().UnixNano(), } - if opts.AddTargets != nil { - var targetDescs []catalog.Descriptor - - for _, targetList := range opts.AddTargets { - descs, err := getTableDescriptors(ctx, p, &targetList, statementTime, initialHighWater) - if err != nil { - return err - } - targetDescs = append(targetDescs, descs...) - } + allDescs, err := backupresolver.LoadAllDescs(ctx, p.ExecCfg(), statementTime) + if err != nil { + return err + } + descResolver, err := backupresolver.NewDescriptorResolver(allDescs) + if err != nil { + return err + } - newTargets, newTables, err := getTargetsAndTables(ctx, p, targetDescs, details.Opts) - if err != nil { - return err - } - // add old targets - for id, table := range details.Tables { - newTables[id] = table - } - details.Tables = newTables - details.TargetSpecifications = append(details.TargetSpecifications, newTargets...) + newDescs := make(map[descpb.ID]*tree.UnresolvedName) + for _, target := range AllTargets(prevDetails) { + desc := descResolver.DescByID[target.TableID] + newDescs[target.TableID] = tree.NewUnresolvedName(desc.GetName()) } - if opts.DropTargets != nil { - var targetDescs []catalog.Descriptor + for _, cmd := range alterChangefeedStmt.Cmds { + switch v := cmd.(type) { + case *tree.AlterChangefeedAddTarget: + for _, targetPattern := range v.Targets.Tables { + targetName, err := getTargetName(targetPattern) + if err != nil { + return err + } + found, _, desc, err := resolver.ResolveExisting( + ctx, + targetName.ToUnresolvedObjectName(), + descResolver, + tree.ObjectLookupFlags{}, + p.CurrentDatabase(), + p.CurrentSearchPath(), + ) + if err != nil { + return err + } + if !found { + return pgerror.Newf(pgcode.InvalidParameterValue, `target %q does not exist`, tree.ErrString(targetPattern)) + } + newDescs[desc.GetID()] = tree.NewUnresolvedName(desc.GetName()) + } + case *tree.AlterChangefeedDropTarget: + for _, targetPattern := range v.Targets.Tables { + targetName, err := getTargetName(targetPattern) + if err != nil { + return err + } + found, _, desc, err := resolver.ResolveExisting( + ctx, + targetName.ToUnresolvedObjectName(), + descResolver, + tree.ObjectLookupFlags{}, + p.CurrentDatabase(), + p.CurrentSearchPath(), + ) + if err != nil { + return err + } + if !found { + return pgerror.Newf(pgcode.InvalidParameterValue, `target %q does not exist`, tree.ErrString(targetPattern)) + } + delete(newDescs, desc.GetID()) + } + case *tree.AlterChangefeedSetOptions: + optsFn, err := p.TypeAsStringOpts(ctx, v.Options, changefeedbase.ChangefeedOptionExpectValues) + if err != nil { + return err + } - for _, targetList := range opts.DropTargets { - descs, err := getTableDescriptors(ctx, p, &targetList, statementTime, initialHighWater) + opts, err := optsFn() if err != nil { return err } - targetDescs = append(targetDescs, descs...) - } - for _, desc := range targetDescs { - if table, isTable := desc.(catalog.TableDescriptor); isTable { - if err := p.CheckPrivilege(ctx, desc, privilege.SELECT); err != nil { - return err + for key, value := range opts { + if _, ok := changefeedbase.ChangefeedOptionExpectValues[key]; !ok { + return pgerror.Newf(pgcode.InvalidParameterValue, `invalid option %q`, key) + } + if _, ok := changefeedbase.AlterChangefeedUnsupportedOptions[key]; ok { + return pgerror.Newf(pgcode.InvalidParameterValue, `cannot alter option %q`, key) } - delete(details.Tables, table.GetID()) + opt := tree.KVOption{Key: tree.Name(key)} + if len(value) > 0 { + opt.Value = tree.NewDString(value) + } + optionsMap[key] = opt } - } - - newTargetSpecifications := make([]jobspb.ChangefeedTargetSpecification, len(details.TargetSpecifications)-len(opts.DropTargets)) - for _, ts := range details.TargetSpecifications { - if _, stillThere := details.Tables[ts.TableID]; stillThere { - newTargetSpecifications = append(newTargetSpecifications, ts) + case *tree.AlterChangefeedUnsetOptions: + optKeys := v.Options.ToStrings() + for _, key := range optKeys { + if _, ok := changefeedbase.ChangefeedOptionExpectValues[key]; !ok { + return pgerror.Newf(pgcode.InvalidParameterValue, `invalid option %q`, key) + } + if _, ok := changefeedbase.AlterChangefeedUnsupportedOptions[key]; ok { + return pgerror.Newf(pgcode.InvalidParameterValue, `cannot alter option %q`, key) + } + delete(optionsMap, key) } } - details.TargetSpecifications = newTargetSpecifications + } + if len(newDescs) == 0 { + return pgerror.Newf(pgcode.InvalidParameterValue, "cannot drop all targets for changefeed job %d", jobID) } - if len(details.Tables) == 0 { - return errors.Errorf("cannot drop all targets for changefeed job %d", jobID) + for _, targetName := range newDescs { + newChangefeedStmt.Targets.Tables = append(newChangefeedStmt.Targets.Tables, targetName) } - if err := validateSink(ctx, p, jobID, details, details.Opts); err != nil { - return err + for _, val := range optionsMap { + newChangefeedStmt.Options = append(newChangefeedStmt.Options, val) } - oldStmt, err := parser.ParseOne(job.Payload().Description) + sinkURIFn, err := p.TypeAsString(ctx, newChangefeedStmt.SinkURI, `ALTER CHANGEFEED`) if err != nil { return err } - oldChangefeedStmt, ok := oldStmt.AST.(*tree.CreateChangefeed) - if !ok { - return errors.Errorf(`could not parse create changefeed statement for job %d`, jobID) - } - var targets tree.TargetList - for _, target := range details.Tables { - targetName := tree.MakeTableNameFromPrefix(tree.ObjectNamePrefix{}, tree.Name(target.StatementTimeName)) - targets.Tables = append(targets.Tables, &targetName) + optsFn, err := p.TypeAsStringOpts(ctx, newChangefeedStmt.Options, changefeedbase.ChangefeedOptionExpectValues) + if err != nil { + return err } - oldChangefeedStmt.Targets = targets - jobDescription := tree.AsString(oldChangefeedStmt) - - newPayload := job.Payload() - newPayload.Description = jobDescription - newPayload.Details = jobspb.WrapPayloadDetails(details) + sinkURI, err := sinkURIFn() + if err != nil { + return err + } - finalDescs, err := getTableDescriptors(ctx, p, &targets, statementTime, initialHighWater) + opts, err := optsFn() if err != nil { return err } - newPayload.DescriptorIDs = func() (sqlDescIDs []descpb.ID) { - for _, desc := range finalDescs { - sqlDescIDs = append(sqlDescIDs, desc.GetID()) - } - return sqlDescIDs - }() + jobRecord, err := createChangefeedJobRecord( + ctx, + p, + newChangefeedStmt, + sinkURI, + opts, + jobID, + ``, + ) + if err != nil { + return errors.Wrap(err, `failed to alter changefeed`) + } + + newDetails := jobRecord.Details.(jobspb.ChangefeedDetails) + + // We need to persist the statement time that was generated during the + // creation of the changefeed + newDetails.StatementTime = prevDetails.StatementTime + + newPayload := job.Payload() + newPayload.Details = jobspb.WrapPayloadDetails(newDetails) + newPayload.Description = jobRecord.Description + newPayload.DescriptorIDs = jobRecord.DescriptorIDs err = p.ExecCfg().JobRegistry.UpdateJobWithTxn(ctx, jobID, p.ExtendedEvalContext().Txn, lockForUpdate, func( txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater, @@ -203,7 +272,7 @@ func alterChangefeedPlanHook( return ctx.Err() case resultsCh <- tree.Datums{ tree.NewDInt(tree.DInt(jobID)), - tree.NewDString(jobDescription), + tree.NewDString(jobRecord.Description), }: return nil } @@ -211,3 +280,16 @@ func alterChangefeedPlanHook( return fn, header, nil, false, nil } + +func getTargetName(targetPattern tree.TablePattern) (*tree.TableName, error) { + pattern, err := targetPattern.NormalizeTablePattern() + if err != nil { + return nil, err + } + targetName, ok := pattern.(*tree.TableName) + if !ok { + return nil, errors.Errorf(`CHANGEFEED cannot target %q`, tree.AsString(targetPattern)) + } + + return targetName, nil +} diff --git a/pkg/ccl/changefeedccl/alter_changefeed_test.go b/pkg/ccl/changefeedccl/alter_changefeed_test.go index fc1a7a56fa85..f5ea7b580928 100644 --- a/pkg/ccl/changefeedccl/alter_changefeed_test.go +++ b/pkg/ccl/changefeedccl/alter_changefeed_test.go @@ -9,12 +9,16 @@ package changefeedccl import ( + "context" gosql "database/sql" "fmt" "testing" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" + "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/sql/tests" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -93,6 +97,68 @@ func TestAlterChangefeedDropTarget(t *testing.T) { t.Run(`kafka`, kafkaTest(testFn)) } +func TestAlterChangefeedSetDiffOption(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) + + testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`) + defer closeFeed(t, testFeed) + + feed, ok := testFeed.(cdctest.EnterpriseTestFeed) + require.True(t, ok) + + sqlDB.Exec(t, `PAUSE JOB $1`, feed.JobID()) + waitForJobStatus(sqlDB, t, feed.JobID(), `paused`) + + sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d SET diff`, feed.JobID())) + + sqlDB.Exec(t, fmt.Sprintf(`RESUME JOB %d`, feed.JobID())) + waitForJobStatus(sqlDB, t, feed.JobID(), `running`) + + sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`) + assertPayloads(t, testFeed, []string{ + `foo: [0]->{"after": {"a": 0, "b": "initial"}, "before": null}`, + }) + } + + t.Run(`kafka`, kafkaTest(testFn)) +} + +func TestAlterChangefeedUnsetDiffOption(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) + + testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo WITH diff`) + defer closeFeed(t, testFeed) + + feed, ok := testFeed.(cdctest.EnterpriseTestFeed) + require.True(t, ok) + + sqlDB.Exec(t, `PAUSE JOB $1`, feed.JobID()) + waitForJobStatus(sqlDB, t, feed.JobID(), `paused`) + + sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d UNSET diff`, feed.JobID())) + + sqlDB.Exec(t, fmt.Sprintf(`RESUME JOB %d`, feed.JobID())) + waitForJobStatus(sqlDB, t, feed.JobID(), `running`) + + sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`) + assertPayloads(t, testFeed, []string{ + `foo: [0]->{"after": {"a": 0, "b": "initial"}}`, + }) + } + + t.Run(`kafka`, kafkaTest(testFn)) +} + func TestAlterChangefeedErrors(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -124,6 +190,34 @@ func TestAlterChangefeedErrors(t *testing.T) { fmt.Sprintf(`job %d is not paused`, feed.JobID()), fmt.Sprintf(`ALTER CHANGEFEED %d ADD bar`, feed.JobID()), ) + + sqlDB.Exec(t, `PAUSE JOB $1`, feed.JobID()) + waitForJobStatus(sqlDB, t, feed.JobID(), `paused`) + + sqlDB.ExpectErr(t, + `pq: target "baz" does not exist`, + fmt.Sprintf(`ALTER CHANGEFEED %d ADD baz`, feed.JobID()), + ) + sqlDB.ExpectErr(t, + `pq: target "baz" does not exist`, + fmt.Sprintf(`ALTER CHANGEFEED %d DROP baz`, feed.JobID()), + ) + sqlDB.ExpectErr(t, + `pq: invalid option "qux"`, + fmt.Sprintf(`ALTER CHANGEFEED %d SET qux`, feed.JobID()), + ) + sqlDB.ExpectErr(t, + `pq: cannot alter option "initial_scan"`, + fmt.Sprintf(`ALTER CHANGEFEED %d SET initial_scan`, feed.JobID()), + ) + sqlDB.ExpectErr(t, + `pq: invalid option "qux"`, + fmt.Sprintf(`ALTER CHANGEFEED %d UNSET qux`, feed.JobID()), + ) + sqlDB.ExpectErr(t, + `pq: cannot alter option "initial_scan"`, + fmt.Sprintf(`ALTER CHANGEFEED %d UNSET initial_scan`, feed.JobID()), + ) } t.Run(`kafka`, kafkaTest(testFn)) @@ -155,3 +249,58 @@ func TestAlterChangefeedDropAllTargetsError(t *testing.T) { t.Run(`kafka`, kafkaTest(testFn)) } + +// The purpose of this test is to ensure that the ALTER CHANGEFEED statement +// does not accidentally redact secret keys in the changefeed details +func TestAlterChangefeedPersistSinkURI(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + params, _ := tests.CreateTestServerParams() + s, rawSQLDB, _ := serverutils.StartServer(t, params) + sqlDB := sqlutils.MakeSQLRunner(rawSQLDB) + registry := s.JobRegistry().(*jobs.Registry) + ctx := context.Background() + defer s.Stopper().Stop(ctx) + + query := `CREATE TABLE foo (a string)` + sqlDB.Exec(t, query) + + query = `CREATE TABLE bar (b string)` + sqlDB.Exec(t, query) + + query = `SET CLUSTER SETTING kv.rangefeed.enabled = true` + sqlDB.Exec(t, query) + + var changefeedID jobspb.JobID + + doneCh := make(chan struct{}) + defer close(doneCh) + registry.TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{ + jobspb.TypeChangefeed: func(raw jobs.Resumer) jobs.Resumer { + r := fakeResumer{ + done: doneCh, + } + return &r + }, + } + + query = `CREATE CHANGEFEED FOR TABLE foo, bar INTO + 's3://fake-bucket-name/fake/path?AWS_ACCESS_KEY_ID=123&AWS_SECRET_ACCESS_KEY=456'` + sqlDB.QueryRow(t, query).Scan(&changefeedID) + + sqlDB.Exec(t, `PAUSE JOB $1`, changefeedID) + waitForJobStatus(sqlDB, t, changefeedID, `paused`) + + sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d SET diff`, changefeedID)) + + sqlDB.Exec(t, fmt.Sprintf(`RESUME JOB %d`, changefeedID)) + waitForJobStatus(sqlDB, t, changefeedID, `running`) + + job, err := registry.LoadJob(ctx, changefeedID) + require.NoError(t, err) + details, ok := job.Details().(jobspb.ChangefeedDetails) + require.True(t, ok) + + require.Equal(t, details.SinkURI, `s3://fake-bucket-name/fake/path?AWS_ACCESS_KEY_ID=123&AWS_SECRET_ACCESS_KEY=456`) +} diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index 745d350deaa5..23012b33e9c1 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -129,6 +129,7 @@ func changefeedPlanHook( if err != nil { return err } + if !unspecifiedSink && sinkURI == `` { // Error if someone specifies an INTO with the empty string. We've // already sent the wrong result column headers. @@ -140,59 +141,20 @@ func changefeedPlanHook( return err } - for key, value := range opts { - // if option is case insensitive then convert its value to lower case - if _, ok := changefeedbase.CaseInsensitiveOpts[key]; ok { - opts[key] = strings.ToLower(value) - } - } - - if newFormat, ok := changefeedbase.NoLongerExperimental[opts[changefeedbase.OptFormat]]; ok { - p.BufferClientNotice(ctx, pgnotice.Newf( - `%[1]s is no longer experimental, use %[2]s=%[1]s`, - newFormat, changefeedbase.OptFormat), - ) - // Still serialize the experimental_ form for backwards compatibility - } - - jobDescription, err := changefeedJobDescription(p, changefeedStmt, sinkURI, opts) - if err != nil { - return err - } - - statementTime := hlc.Timestamp{ - WallTime: p.ExtendedEvalContext().GetStmtTimestamp().UnixNano(), - } - var initialHighWater hlc.Timestamp - if cursor, ok := opts[changefeedbase.OptCursor]; ok { - asOfClause := tree.AsOfClause{Expr: tree.NewStrVal(cursor)} - var err error - asOf, err := p.EvalAsOfTimestamp(ctx, asOfClause) - if err != nil { - return err - } - initialHighWater = asOf.Timestamp - statementTime = initialHighWater - } - - // This grabs table descriptors once to get their ids. - targetDescs, err := getTableDescriptors(ctx, p, &changefeedStmt.Targets, statementTime, initialHighWater) - if err != nil { - return err - } - - targets, tables, err := getTargetsAndTables(ctx, p, targetDescs, opts) + jr, err := createChangefeedJobRecord( + ctx, + p, + changefeedStmt, + sinkURI, + opts, + jobspb.InvalidJobID, + `changefeed.create`, + ) if err != nil { return err } - details := jobspb.ChangefeedDetails{ - Tables: tables, - Opts: opts, - SinkURI: sinkURI, - StatementTime: statementTime, - TargetSpecifications: targets, - } + details := jr.Details.(jobspb.ChangefeedDetails) progress := jobspb.Progress{ Progress: &jobspb.Progress_HighWater{}, Details: &jobspb.Progress_Changefeed{ @@ -200,97 +162,6 @@ func changefeedPlanHook( }, } - // TODO(dan): In an attempt to present the most helpful error message to the - // user, the ordering requirements between all these usage validations have - // become extremely fragile and non-obvious. - // - // - `validateDetails` has to run first to fill in defaults for `envelope` - // and `format` if the user didn't specify them. - // - Then `getEncoder` is run to return any configuration errors. - // - Then the changefeed is opted in to `OptKeyInValue` for any cloud - // storage sink or webhook sink. Kafka etc have a key and value field in - // each message but cloud storage sinks and webhook sinks don't have - // anywhere to put the key. So if the key is not in the value, then for - // DELETEs there is no way to recover which key was deleted. We could make - // the user explicitly pass this option for every cloud storage sink/ - // webhook sink and error if they don't, but that seems user-hostile for - // insufficient reason. We can't do this any earlier, because we might - // return errors about `key_in_value` being incompatible which is - // confusing when the user didn't type that option. - // This is the same for the topic and webhook sink, which uses - // `topic_in_value` to embed the topic in the value by default, since it - // has no other avenue to express the topic. - // - Finally, we create a "canary" sink to test sink configuration and - // connectivity. This has to go last because it is strange to return sink - // connectivity errors before we've finished validating all the other - // options. We should probably split sink configuration checking and sink - // connectivity checking into separate methods. - // - // The only upside in all this nonsense is the tests are decent. I've tuned - // this particular order simply by rearranging stuff until the changefeedccl - // tests all pass. - parsedSink, err := url.Parse(sinkURI) - if err != nil { - return err - } - if newScheme, ok := changefeedbase.NoLongerExperimental[parsedSink.Scheme]; ok { - parsedSink.Scheme = newScheme // This gets munged anyway when building the sink - p.BufferClientNotice(ctx, pgnotice.Newf(`%[1]s is no longer experimental, use %[1]s://`, - newScheme), - ) - } - - if details, err = validateDetails(details); err != nil { - return err - } - - if _, err := getEncoder(details.Opts, AllTargets(details)); err != nil { - return err - } - - if isCloudStorageSink(parsedSink) || isWebhookSink(parsedSink) { - details.Opts[changefeedbase.OptKeyInValue] = `` - } - if isWebhookSink(parsedSink) { - details.Opts[changefeedbase.OptTopicInValue] = `` - } - - if !unspecifiedSink && p.ExecCfg().ExternalIODirConfig.DisableOutbound { - return errors.Errorf("Outbound IO is disabled by configuration, cannot create changefeed into %s", parsedSink.Scheme) - } - - if _, shouldProtect := details.Opts[changefeedbase.OptProtectDataFromGCOnPause]; shouldProtect && !p.ExecCfg().Codec.ForSystemTenant() { - return errorutil.UnsupportedWithMultiTenancy(67271) - } - - // Feature telemetry - telemetrySink := parsedSink.Scheme - if telemetrySink == `` { - telemetrySink = `sinkless` - } - telemetry.Count(`changefeed.create.sink.` + telemetrySink) - telemetry.Count(`changefeed.create.format.` + details.Opts[changefeedbase.OptFormat]) - telemetry.CountBucketed(`changefeed.create.num_tables`, int64(len(tables))) - - if scope, ok := opts[changefeedbase.OptMetricsScope]; ok { - if err := utilccl.CheckEnterpriseEnabled( - p.ExecCfg().Settings, p.ExecCfg().ClusterID(), p.ExecCfg().Organization(), "CHANGEFEED", - ); err != nil { - return errors.Wrapf(err, - "use of %q option requires enterprise license.", changefeedbase.OptMetricsScope) - } - - if scope == defaultSLIScope { - return pgerror.Newf(pgcode.InvalidParameterValue, - "%[1]q=%[2]q is the default metrics scope which keeps track of statistics "+ - "across all changefeeds without explicit label. "+ - "If this is an intended behavior, please re-run the statement "+ - "without specifying %[1]q parameter. "+ - "Otherwise, please re-run with a different %[1]q value.", - changefeedbase.OptMetricsScope, defaultSLIScope) - } - } - if details.SinkURI == `` { telemetry.Count(`changefeed.create.core`) err := distChangefeedFlow(ctx, p, 0 /* jobID */, details, progress, resultsCh) @@ -300,23 +171,6 @@ func changefeedPlanHook( return changefeedbase.MaybeStripRetryableErrorMarker(err) } - if err := utilccl.CheckEnterpriseEnabled( - p.ExecCfg().Settings, p.ExecCfg().ClusterID(), p.ExecCfg().Organization(), "CHANGEFEED", - ); err != nil { - return err - } - - telemetry.Count(`changefeed.create.enterprise`) - - // In the case where a user is executing a CREATE CHANGEFEED and is still - // waiting for the statement to return, we take the opportunity to ensure - // that the user has not made any obvious errors when specifying the sink in - // the CREATE CHANGEFEED statement. To do this, we create a "canary" sink, - // which will be immediately closed, only to check for errors. - if err := validateSink(ctx, p, jobspb.InvalidJobID, details, opts); err != nil { - return err - } - // The below block creates the job and protects the data required for the // changefeed to function from being garbage collected even if the // changefeed lags behind the gcttl. We protect the data here rather than in @@ -333,25 +187,14 @@ func changefeedPlanHook( var protectedTimestampID uuid.UUID codec := p.ExecCfg().Codec if shouldProtectTimestamps(codec) { - ptr = createProtectedTimestampRecord(ctx, codec, jobID, AllTargets(details), statementTime, progress.GetChangefeed()) + ptr = createProtectedTimestampRecord(ctx, codec, jobID, AllTargets(details), details.StatementTime, progress.GetChangefeed()) protectedTimestampID = ptr.ID.GetUUID() } - jr := jobs.Record{ - Description: jobDescription, - Username: p.User(), - DescriptorIDs: func() (sqlDescIDs []descpb.ID) { - for _, desc := range targetDescs { - sqlDescIDs = append(sqlDescIDs, desc.GetID()) - } - return sqlDescIDs - }(), - Details: details, - Progress: *progress.GetChangefeed(), - } + jr.Progress = *progress.GetChangefeed() if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - if err := p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, &sj, jobID, txn, jr); err != nil { + if err := p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, &sj, jobID, txn, *jr); err != nil { return err } if ptr != nil { @@ -399,6 +242,209 @@ func changefeedPlanHook( return fn, header, nil, avoidBuffering, nil } +func createChangefeedJobRecord( + ctx context.Context, + p sql.PlanHookState, + changefeedStmt *tree.CreateChangefeed, + sinkURI string, + opts map[string]string, + jobID jobspb.JobID, + telemetryPath string, +) (*jobs.Record, error) { + unspecifiedSink := changefeedStmt.SinkURI == nil + + for key, value := range opts { + // if option is case insensitive then convert its value to lower case + if _, ok := changefeedbase.CaseInsensitiveOpts[key]; ok { + opts[key] = strings.ToLower(value) + } + } + + if newFormat, ok := changefeedbase.NoLongerExperimental[opts[changefeedbase.OptFormat]]; ok { + p.BufferClientNotice(ctx, pgnotice.Newf( + `%[1]s is no longer experimental, use %[2]s=%[1]s`, + newFormat, changefeedbase.OptFormat), + ) + // Still serialize the experimental_ form for backwards compatibility + } + + jobDescription, err := changefeedJobDescription(p, changefeedStmt, sinkURI, opts) + if err != nil { + return nil, err + } + + statementTime := hlc.Timestamp{ + WallTime: p.ExtendedEvalContext().GetStmtTimestamp().UnixNano(), + } + var initialHighWater hlc.Timestamp + if cursor, ok := opts[changefeedbase.OptCursor]; ok { + asOfClause := tree.AsOfClause{Expr: tree.NewStrVal(cursor)} + var err error + asOf, err := p.EvalAsOfTimestamp(ctx, asOfClause) + if err != nil { + return nil, err + } + initialHighWater = asOf.Timestamp + statementTime = initialHighWater + } + + // This grabs table descriptors once to get their ids. + targetDescs, err := getTableDescriptors(ctx, p, &changefeedStmt.Targets, statementTime, initialHighWater) + if err != nil { + return nil, err + } + + targets, tables, err := getTargetsAndTables(ctx, p, targetDescs, opts) + if err != nil { + return nil, err + } + + details := jobspb.ChangefeedDetails{ + Tables: tables, + Opts: opts, + SinkURI: sinkURI, + StatementTime: statementTime, + TargetSpecifications: targets, + } + + // TODO(dan): In an attempt to present the most helpful error message to the + // user, the ordering requirements between all these usage validations have + // become extremely fragile and non-obvious. + // + // - `validateDetails` has to run first to fill in defaults for `envelope` + // and `format` if the user didn't specify them. + // - Then `getEncoder` is run to return any configuration errors. + // - Then the changefeed is opted in to `OptKeyInValue` for any cloud + // storage sink or webhook sink. Kafka etc have a key and value field in + // each message but cloud storage sinks and webhook sinks don't have + // anywhere to put the key. So if the key is not in the value, then for + // DELETEs there is no way to recover which key was deleted. We could make + // the user explicitly pass this option for every cloud storage sink/ + // webhook sink and error if they don't, but that seems user-hostile for + // insufficient reason. We can't do this any earlier, because we might + // return errors about `key_in_value` being incompatible which is + // confusing when the user didn't type that option. + // This is the same for the topic and webhook sink, which uses + // `topic_in_value` to embed the topic in the value by default, since it + // has no other avenue to express the topic. + // - Finally, we create a "canary" sink to test sink configuration and + // connectivity. This has to go last because it is strange to return sink + // connectivity errors before we've finished validating all the other + // options. We should probably split sink configuration checking and sink + // connectivity checking into separate methods. + // + // The only upside in all this nonsense is the tests are decent. I've tuned + // this particular order simply by rearranging stuff until the changefeedccl + // tests all pass. + parsedSink, err := url.Parse(sinkURI) + if err != nil { + return nil, err + } + if newScheme, ok := changefeedbase.NoLongerExperimental[parsedSink.Scheme]; ok { + parsedSink.Scheme = newScheme // This gets munged anyway when building the sink + p.BufferClientNotice(ctx, pgnotice.Newf(`%[1]s is no longer experimental, use %[1]s://`, + newScheme), + ) + } + + if details, err = validateDetails(details); err != nil { + return nil, err + } + + if _, err := getEncoder(details.Opts, AllTargets(details)); err != nil { + return nil, err + } + + if isCloudStorageSink(parsedSink) || isWebhookSink(parsedSink) { + details.Opts[changefeedbase.OptKeyInValue] = `` + } + if isWebhookSink(parsedSink) { + details.Opts[changefeedbase.OptTopicInValue] = `` + } + + if !unspecifiedSink && p.ExecCfg().ExternalIODirConfig.DisableOutbound { + return nil, errors.Errorf("Outbound IO is disabled by configuration, cannot create changefeed into %s", parsedSink.Scheme) + } + + if _, shouldProtect := details.Opts[changefeedbase.OptProtectDataFromGCOnPause]; shouldProtect && !p.ExecCfg().Codec.ForSystemTenant() { + return nil, errorutil.UnsupportedWithMultiTenancy(67271) + } + + if telemetryPath != `` { + // Feature telemetry + telemetrySink := parsedSink.Scheme + if telemetrySink == `` { + telemetrySink = `sinkless` + } + telemetry.Count(telemetryPath + `.sink.` + telemetrySink) + telemetry.Count(telemetryPath + `.format.` + details.Opts[changefeedbase.OptFormat]) + telemetry.CountBucketed(telemetryPath+`.num_tables`, int64(len(tables))) + } + + if scope, ok := opts[changefeedbase.OptMetricsScope]; ok { + if err := utilccl.CheckEnterpriseEnabled( + p.ExecCfg().Settings, p.ExecCfg().ClusterID(), p.ExecCfg().Organization(), "CHANGEFEED", + ); err != nil { + return nil, errors.Wrapf(err, + "use of %q option requires enterprise license.", changefeedbase.OptMetricsScope) + } + + if scope == defaultSLIScope { + return nil, pgerror.Newf(pgcode.InvalidParameterValue, + "%[1]q=%[2]q is the default metrics scope which keeps track of statistics "+ + "across all changefeeds without explicit label. "+ + "If this is an intended behavior, please re-run the statement "+ + "without specifying %[1]q parameter. "+ + "Otherwise, please re-run with a different %[1]q value.", + changefeedbase.OptMetricsScope, defaultSLIScope) + } + } + + if details.SinkURI == `` { + // Jobs should not be created for sinkless changefeeds. However, note that + // we create and return a job record for sinkless changefeeds below. This is + // because we need the details field to create our sinkless changefeed. + // After this job record is returned, we create our forever running sinkless + // changefeed, thus ensuring that no job is created for this changefeed as + // desired. + sinklessRecord := &jobs.Record{ + Details: details, + } + return sinklessRecord, nil + } + + if err := utilccl.CheckEnterpriseEnabled( + p.ExecCfg().Settings, p.ExecCfg().ClusterID(), p.ExecCfg().Organization(), "CHANGEFEED", + ); err != nil { + return nil, err + } + + if telemetryPath != `` { + telemetry.Count(telemetryPath + `.enterprise`) + } + + // In the case where a user is executing a CREATE CHANGEFEED and is still + // waiting for the statement to return, we take the opportunity to ensure + // that the user has not made any obvious errors when specifying the sink in + // the CREATE CHANGEFEED statement. To do this, we create a "canary" sink, + // which will be immediately closed, only to check for errors. + err = validateSink(ctx, p, jobID, details, opts) + + jr := &jobs.Record{ + Description: jobDescription, + Username: p.User(), + DescriptorIDs: func() (sqlDescIDs []descpb.ID) { + for _, desc := range targetDescs { + sqlDescIDs = append(sqlDescIDs, desc.GetID()) + } + return sqlDescIDs + }(), + Details: details, + } + + return jr, err +} + func validateSettings(ctx context.Context, p sql.PlanHookState) error { if err := featureflag.CheckEnabled( ctx, diff --git a/pkg/ccl/changefeedccl/changefeedbase/options.go b/pkg/ccl/changefeedccl/changefeedbase/options.go index 543a042d3f04..50d506927af3 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options.go @@ -233,3 +233,7 @@ var NoLongerExperimental = map[string]string{ DeprecatedSinkSchemeCloudStorageNodelocal: SinkSchemeCloudStorageNodelocal, DeprecatedSinkSchemeCloudStorageS3: SinkSchemeCloudStorageS3, } + +// AlterChangefeedUnsupportedOptions are changefeed options that we do not allow +// users to alter +var AlterChangefeedUnsupportedOptions = makeStringSet(OptCursor, OptInitialScan, OptNoInitialScan) diff --git a/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go b/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go index b358cef8ed18..33ecca26a33c 100644 --- a/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go +++ b/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go @@ -362,41 +362,51 @@ func TestShowChangefeedJobsAlterChangefeed(t *testing.T) { feed, ok := foo.(cdctest.EnterpriseTestFeed) require.True(t, ok) - sqlDB.Exec(t, `PAUSE JOB $1`, feed.JobID()) - waitForJobStatus(sqlDB, t, feed.JobID(), `paused`) - - sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d ADD bar`, feed.JobID())) + jobID := feed.JobID() + details, err := feed.Details() + require.NoError(t, err) + sinkURI := details.SinkURI type row struct { id jobspb.JobID + description string SinkURI string FullTableNames []uint8 format string topics string } - var out row - - query := `SELECT job_id, sink_uri, full_table_names, format, IFNULL(topics, '') FROM [SHOW CHANGEFEED JOBS] ORDER BY sink_uri` - rowResults := sqlDB.Query(t, query) - - if !rowResults.Next() { - err := rowResults.Err() + obtainJobRowFn := func() row { + var out row + + query := fmt.Sprintf( + `SELECT job_id, description, sink_uri, full_table_names, format, IFNULL(topics, '') FROM [SHOW CHANGEFEED JOB %d]`, + jobID, + ) + + rowResults := sqlDB.Query(t, query) + if !rowResults.Next() { + err := rowResults.Err() + if err != nil { + t.Fatalf("Error encountered while querying the next row: %v", err) + } else { + t.Fatalf("Expected more rows when querying and none found for query: %s", query) + } + } + err := rowResults.Scan(&out.id, &out.description, &out.SinkURI, &out.FullTableNames, &out.format, &out.topics) if err != nil { - t.Fatalf("Error encountered while querying the next row: %v", err) - } else { - t.Fatalf("Expected more rows when querying and none found for query: %s", query) + t.Fatal(err) } - } - err := rowResults.Scan(&out.id, &out.SinkURI, &out.FullTableNames, &out.format, &out.topics) - if err != nil { - t.Fatal(err) + + return out } - details, err := feed.Details() - require.NoError(t, err) - sinkURI := details.SinkURI - jobID := feed.JobID() + sqlDB.Exec(t, `PAUSE JOB $1`, jobID) + waitForJobStatus(sqlDB, t, jobID, `paused`) + + sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d ADD bar`, jobID)) + + out := obtainJobRowFn() topicsArr := strings.Split(out.topics, ",") sort.Strings(topicsArr) @@ -408,32 +418,26 @@ func TestShowChangefeedJobsAlterChangefeed(t *testing.T) { require.Equal(t, "json", out.format, "Expected format:%s but found format:%s", "json", out.format) sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d DROP foo`, feed.JobID())) - rowResults = sqlDB.Query(t, query) - - if !rowResults.Next() { - err := rowResults.Err() - if err != nil { - t.Fatalf("Error encountered while querying the next row: %v", err) - } else { - t.Fatalf("Expected more rows when querying and none found for query: %s", query) - } - } - err = rowResults.Scan(&out.id, &out.SinkURI, &out.FullTableNames, &out.format, &out.topics) - if err != nil { - t.Fatal(err) - } - details, err = feed.Details() - require.NoError(t, err) - sinkURI = details.SinkURI - jobID = feed.JobID() + out = obtainJobRowFn() require.Equal(t, jobID, out.id, "Expected id:%d but found id:%d", jobID, out.id) + require.Equal(t, "CREATE CHANGEFEED FOR TABLE bar INTO 'kafka://does.not.matter/' WITH envelope = 'wrapped', format = 'json', on_error = 'fail', schema_change_events = 'default', schema_change_policy = 'backfill', virtual_columns = 'omitted'", out.description, "Expected description:%s but found description:%s", "CREATE CHANGEFEED FOR TABLE bar INTO 'kafka://does.not.matter/'", out.description) require.Equal(t, sinkURI, out.SinkURI, "Expected sinkUri:%s but found sinkUri:%s", sinkURI, out.SinkURI) - require.Equal(t, "bar", out.topics, "Expected topics:%s but found topics:%s", "bar,foo", sortedTopics) - require.Equal(t, "{d.public.bar}", string(out.FullTableNames), "Expected fullTableNames:%s but found fullTableNames:%s", "{d.public.foo,d.public.bar}", string(out.FullTableNames)) + require.Equal(t, "bar", out.topics, "Expected topics:%s but found topics:%s", "bar", sortedTopics) + require.Equal(t, "{d.public.bar}", string(out.FullTableNames), "Expected fullTableNames:%s but found fullTableNames:%s", "{d.public.bar}", string(out.FullTableNames)) require.Equal(t, "json", out.format, "Expected format:%s but found format:%s", "json", out.format) + sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d SET resolved = '5s'`, feed.JobID())) + + out = obtainJobRowFn() + + require.Equal(t, jobID, out.id, "Expected id:%d but found id:%d", jobID, out.id) + require.Equal(t, "CREATE CHANGEFEED FOR TABLE bar INTO 'kafka://does.not.matter/' WITH envelope = 'wrapped', format = 'json', on_error = 'fail', resolved = '5s', schema_change_events = 'default', schema_change_policy = 'backfill', virtual_columns = 'omitted'", out.description, "Expected description:%s but found description:%s", "CREATE CHANGEFEED FOR TABLE bar INTO 'kafka://does.not.matter/'", out.description) + require.Equal(t, sinkURI, out.SinkURI, "Expected sinkUri:%s but found sinkUri:%s", sinkURI, out.SinkURI) + require.Equal(t, "bar", out.topics, "Expected topics:%s but found topics:%s", "bar", sortedTopics) + require.Equal(t, "{d.public.bar}", string(out.FullTableNames), "Expected fullTableNames:%s but found fullTableNames:%s", "{d.public.bar}", string(out.FullTableNames)) + require.Equal(t, "json", out.format, "Expected format:%s but found format:%s", "json", out.format) } t.Run(`kafka`, kafkaTest(testFn)) diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index cdef75d7c5b5..a562b8fec17e 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -887,7 +887,7 @@ func (u *sqlSymUnion) fetchCursor() *tree.FetchCursor { %token TRACING %token UNBOUNDED UNCOMMITTED UNION UNIQUE UNKNOWN UNLOGGED UNSPLIT -%token UPDATE UPSERT UNTIL USE USER USERS USING UUID +%token UPDATE UPSERT UNSET UNTIL USE USER USERS USING UUID %token VALID VALIDATE VALUE VALUES VARBIT VARCHAR VARIADIC VIEW VARYING VIEWACTIVITY VIEWACTIVITYREDACTED %token VIEWCLUSTERSETTING VIRTUAL VISIBLE VOTERS @@ -4376,7 +4376,7 @@ explain_option_list: // %Help: ALTER CHANGEFEED - alter an existing changefeed // %Category: CCL // %Text: -// ALTER CHANGEFEED {{ADD|DROP} }... +// ALTER CHANGEFEED {{ADD|DROP } | SET }... alter_changefeed_stmt: ALTER CHANGEFEED a_expr alter_changefeed_cmds { @@ -4412,6 +4412,18 @@ alter_changefeed_cmd: Targets: $2.targetList(), } } +| SET kv_option_list + { + $$.val = &tree.AlterChangefeedSetOptions{ + Options: $2.kvOptions(), + } + } +| UNSET name_list + { + $$.val = &tree.AlterChangefeedUnsetOptions{ + Options: $2.nameList(), + } + } // %Help: ALTER BACKUP - alter an existing backup's encryption keys // %Category: CCL @@ -14110,6 +14122,7 @@ unreserved_keyword: | UNCOMMITTED | UNKNOWN | UNLOGGED +| UNSET | UNSPLIT | UNTIL | UPDATE diff --git a/pkg/sql/parser/testdata/alter_changefeed b/pkg/sql/parser/testdata/alter_changefeed index 388715f7fea6..db4c276804c3 100644 --- a/pkg/sql/parser/testdata/alter_changefeed +++ b/pkg/sql/parser/testdata/alter_changefeed @@ -56,3 +56,76 @@ ALTER CHANGEFEED 123 ADD foo DROP bar ADD baz, qux DROP quux -- normalized! ALTER CHANGEFEED (123) ADD (foo) DROP (bar) ADD (baz), (qux) DROP (quux) -- fully parenthesized ALTER CHANGEFEED _ ADD foo DROP bar ADD baz, qux DROP quux -- literals removed ALTER CHANGEFEED 123 ADD _ DROP _ ADD _, _ DROP _ -- identifiers removed + +parse +ALTER CHANGEFEED 123 SET foo = 'bar' +---- +ALTER CHANGEFEED 123 SET foo = 'bar' +ALTER CHANGEFEED (123) SET foo = ('bar') -- fully parenthesized +ALTER CHANGEFEED _ SET foo = '_' -- literals removed +ALTER CHANGEFEED 123 SET _ = 'bar' -- identifiers removed + + +parse +ALTER CHANGEFEED 123 ADD foo SET bar = 'baz', qux = 'quux' +---- +ALTER CHANGEFEED 123 ADD foo SET bar = 'baz', qux = 'quux' -- normalized! +ALTER CHANGEFEED (123) ADD (foo) SET bar = ('baz'), qux = ('quux') -- fully parenthesized +ALTER CHANGEFEED _ ADD foo SET bar = '_', qux = '_' -- literals removed +ALTER CHANGEFEED 123 ADD _ SET _ = 'baz', _ = 'quux' -- identifiers removed + +parse +ALTER CHANGEFEED 123 DROP foo SET bar = 'baz', qux = 'quux' +---- +ALTER CHANGEFEED 123 DROP foo SET bar = 'baz', qux = 'quux' -- normalized! +ALTER CHANGEFEED (123) DROP (foo) SET bar = ('baz'), qux = ('quux') -- fully parenthesized +ALTER CHANGEFEED _ DROP foo SET bar = '_', qux = '_' -- literals removed +ALTER CHANGEFEED 123 DROP _ SET _ = 'baz', _ = 'quux' -- identifiers removed + +parse +ALTER CHANGEFEED 123 SET foo = 'bar' ADD baz DROP qux +---- +ALTER CHANGEFEED 123 SET foo = 'bar' ADD baz DROP qux -- normalized! +ALTER CHANGEFEED (123) SET foo = ('bar') ADD (baz) DROP (qux) -- fully parenthesized +ALTER CHANGEFEED _ SET foo = '_' ADD baz DROP qux -- literals removed +ALTER CHANGEFEED 123 SET _ = 'bar' ADD _ DROP _ -- identifiers removed + +parse +ALTER CHANGEFEED 123 ADD foo SET bar = 'baz', qux = 'quux' DROP corge +---- +ALTER CHANGEFEED 123 ADD foo SET bar = 'baz', qux = 'quux' DROP corge -- normalized! +ALTER CHANGEFEED (123) ADD (foo) SET bar = ('baz'), qux = ('quux') DROP (corge) -- fully parenthesized +ALTER CHANGEFEED _ ADD foo SET bar = '_', qux = '_' DROP corge -- literals removed +ALTER CHANGEFEED 123 ADD _ SET _ = 'baz', _ = 'quux' DROP _ -- identifiers removed + +parse +ALTER CHANGEFEED 123 UNSET foo +---- +ALTER CHANGEFEED 123 UNSET foo +ALTER CHANGEFEED (123) UNSET foo -- fully parenthesized +ALTER CHANGEFEED _ UNSET foo -- literals removed +ALTER CHANGEFEED 123 UNSET _ -- identifiers removed + +parse +ALTER CHANGEFEED 123 ADD foo UNSET bar, baz +---- +ALTER CHANGEFEED 123 ADD foo UNSET bar, baz -- normalized! +ALTER CHANGEFEED (123) ADD (foo) UNSET bar, baz -- fully parenthesized +ALTER CHANGEFEED _ ADD foo UNSET bar, baz -- literals removed +ALTER CHANGEFEED 123 ADD _ UNSET _, _ -- identifiers removed + +parse +ALTER CHANGEFEED 123 UNSET foo, bar ADD baz DROP qux +---- +ALTER CHANGEFEED 123 UNSET foo, bar ADD baz DROP qux -- normalized! +ALTER CHANGEFEED (123) UNSET foo, bar ADD (baz) DROP (qux) -- fully parenthesized +ALTER CHANGEFEED _ UNSET foo, bar ADD baz DROP qux -- literals removed +ALTER CHANGEFEED 123 UNSET _, _ ADD _ DROP _ -- identifiers removed + +parse +ALTER CHANGEFEED 123 ADD foo DROP bar SET baz = 'qux' UNSET quux, corge +---- +ALTER CHANGEFEED 123 ADD foo DROP bar SET baz = 'qux' UNSET quux, corge -- normalized! +ALTER CHANGEFEED (123) ADD (foo) DROP (bar) SET baz = ('qux') UNSET quux, corge -- fully parenthesized +ALTER CHANGEFEED _ ADD foo DROP bar SET baz = '_' UNSET quux, corge -- literals removed +ALTER CHANGEFEED 123 ADD _ DROP _ SET _ = 'qux' UNSET _, _ -- identifiers removed diff --git a/pkg/sql/sem/tree/alter_changefeed.go b/pkg/sql/sem/tree/alter_changefeed.go index c4caeae5b931..885c0f54b0d0 100644 --- a/pkg/sql/sem/tree/alter_changefeed.go +++ b/pkg/sql/sem/tree/alter_changefeed.go @@ -46,11 +46,15 @@ type AlterChangefeedCmd interface { alterChangefeedCmd() } -func (*AlterChangefeedAddTarget) alterChangefeedCmd() {} -func (*AlterChangefeedDropTarget) alterChangefeedCmd() {} +func (*AlterChangefeedAddTarget) alterChangefeedCmd() {} +func (*AlterChangefeedDropTarget) alterChangefeedCmd() {} +func (*AlterChangefeedSetOptions) alterChangefeedCmd() {} +func (*AlterChangefeedUnsetOptions) alterChangefeedCmd() {} var _ AlterChangefeedCmd = &AlterChangefeedAddTarget{} var _ AlterChangefeedCmd = &AlterChangefeedDropTarget{} +var _ AlterChangefeedCmd = &AlterChangefeedSetOptions{} +var _ AlterChangefeedCmd = &AlterChangefeedUnsetOptions{} // AlterChangefeedAddTarget represents an ADD command type AlterChangefeedAddTarget struct { @@ -73,3 +77,25 @@ func (node *AlterChangefeedDropTarget) Format(ctx *FmtCtx) { ctx.WriteString(" DROP ") ctx.FormatNode(&node.Targets.Tables) } + +// AlterChangefeedSetOptions represents an SET command +type AlterChangefeedSetOptions struct { + Options KVOptions +} + +// Format implements the NodeFormatter interface. +func (node *AlterChangefeedSetOptions) Format(ctx *FmtCtx) { + ctx.WriteString(" SET ") + ctx.FormatNode(&node.Options) +} + +// AlterChangefeedUnsetOptions represents an UNSET command +type AlterChangefeedUnsetOptions struct { + Options NameList +} + +// Format implements the NodeFormatter interface. +func (node *AlterChangefeedUnsetOptions) Format(ctx *FmtCtx) { + ctx.WriteString(" UNSET ") + ctx.FormatNode(&node.Options) +}