Skip to content

Commit

Permalink
changefeedccl: allow users to alter changefeed options
Browse files Browse the repository at this point in the history
with the ALTER CHANGEFEED statement

Currently, with the ALTER CHANGEFEED statement users
can only add or drop targets from an existing
changefeed. In this PR, we would like to extend this
functionality so that an user can edit and unset
the options of an existing changefeed as well.
The syntax of this addition is the following:

ALTER CHANGEFEED <job_id> SET <options> UNSET <opt_list>

Note that the <options> must follow the same syntax
that is used when creating a changefeed with options.
In particular, if you would like to set an option
that requires a value, you must write

SET opt = 'value'

On the other hand, if you would like to set an
option that requires no value, you must write

SET opt

Furthermore, this PR allows users to unset options.
This can be achieved by writing

UNSET <opt_list>

Where <opt_list> is a list of options that you
would like to unset. For example, if we would like
to unset the diff and resolved options for
changefeed 123, we would achieve this by writing

ALTER CHANGEFEED 123 UNSET diff, resolved

Release note (enterprise change): Added support to
the ALTER CHANGEFEED statement so that users can edit
and unset the options of an existing changefeed. The
syntax of this addition is the following:

ALTER CHANGEFEED <job_id> SET <options> UNSET <opt_list>
  • Loading branch information
Sherman Grewal authored and RajivTS committed Mar 6, 2022
1 parent 8fb0a61 commit 949af29
Show file tree
Hide file tree
Showing 10 changed files with 699 additions and 300 deletions.
3 changes: 3 additions & 0 deletions docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -1258,6 +1258,7 @@ unreserved_keyword ::=
| 'UNCOMMITTED'
| 'UNKNOWN'
| 'UNLOGGED'
| 'UNSET'
| 'UNSPLIT'
| 'UNTIL'
| 'UPDATE'
Expand Down Expand Up @@ -2473,6 +2474,8 @@ alter_default_privileges_target_object ::=
alter_changefeed_cmd ::=
'ADD' changefeed_targets
| 'DROP' changefeed_targets
| 'SET' kv_option_list
| 'UNSET' name_list

alter_backup_cmd ::=
'ADD' backup_kms
Expand Down
1 change: 0 additions & 1 deletion pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ go_library(
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/flowinfra",
"//pkg/sql/parser",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/pgwire/pgnotice",
Expand Down
246 changes: 164 additions & 82 deletions pkg/ccl/changefeedccl/alter_changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,17 @@ package changefeedccl
import (
"context"

"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupresolver"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/resolver"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand All @@ -30,11 +32,6 @@ func init() {
sql.AddPlanHook("alter changefeed", alterChangefeedPlanHook)
}

type alterChangefeedOpts struct {
AddTargets []tree.TargetList
DropTargets []tree.TargetList
}

// alterChangefeedPlanHook implements sql.PlanHookFn.
func alterChangefeedPlanHook(
ctx context.Context, stmt tree.Statement, p sql.PlanHookState,
Expand Down Expand Up @@ -67,7 +64,7 @@ func alterChangefeedPlanHook(
return err
}

details, ok := job.Details().(jobspb.ChangefeedDetails)
prevDetails, ok := job.Details().(jobspb.ChangefeedDetails)
if !ok {
return errors.Errorf(`job %d is not changefeed job`, jobID)
}
Expand All @@ -76,116 +73,188 @@ func alterChangefeedPlanHook(
return errors.Errorf(`job %d is not paused`, jobID)
}

var opts alterChangefeedOpts
for _, cmd := range alterChangefeedStmt.Cmds {
switch v := cmd.(type) {
case *tree.AlterChangefeedAddTarget:
opts.AddTargets = append(opts.AddTargets, v.Targets)
case *tree.AlterChangefeedDropTarget:
opts.DropTargets = append(opts.DropTargets, v.Targets)
// this CREATE CHANGEFEED node will be used to update the existing changefeed
newChangefeedStmt := &tree.CreateChangefeed{
SinkURI: tree.NewDString(prevDetails.SinkURI),
}

optionsMap := make(map[string]tree.KVOption, len(prevDetails.Opts))

// pull the options that are set for the existing changefeed
for key, value := range prevDetails.Opts {
// There are some options (e.g. topics) that we set during the creation of
// a changefeed, but we do not allow these options to be set by the user.
// Hence, we can not include these options in our new CREATE CHANGEFEED
// statement.
if _, ok := changefeedbase.ChangefeedOptionExpectValues[key]; !ok {
continue
}
existingOpt := tree.KVOption{Key: tree.Name(key)}
if len(value) > 0 {
existingOpt.Value = tree.NewDString(value)
}
optionsMap[key] = existingOpt
}

var initialHighWater hlc.Timestamp
statementTime := hlc.Timestamp{
WallTime: p.ExtendedEvalContext().GetStmtTimestamp().UnixNano(),
}

if opts.AddTargets != nil {
var targetDescs []catalog.Descriptor

for _, targetList := range opts.AddTargets {
descs, err := getTableDescriptors(ctx, p, &targetList, statementTime, initialHighWater)
if err != nil {
return err
}
targetDescs = append(targetDescs, descs...)
}
allDescs, err := backupresolver.LoadAllDescs(ctx, p.ExecCfg(), statementTime)
if err != nil {
return err
}
descResolver, err := backupresolver.NewDescriptorResolver(allDescs)
if err != nil {
return err
}

newTargets, newTables, err := getTargetsAndTables(ctx, p, targetDescs, details.Opts)
if err != nil {
return err
}
// add old targets
for id, table := range details.Tables {
newTables[id] = table
}
details.Tables = newTables
details.TargetSpecifications = append(details.TargetSpecifications, newTargets...)
newDescs := make(map[descpb.ID]*tree.UnresolvedName)

for _, target := range AllTargets(prevDetails) {
desc := descResolver.DescByID[target.TableID]
newDescs[target.TableID] = tree.NewUnresolvedName(desc.GetName())
}

if opts.DropTargets != nil {
var targetDescs []catalog.Descriptor
for _, cmd := range alterChangefeedStmt.Cmds {
switch v := cmd.(type) {
case *tree.AlterChangefeedAddTarget:
for _, targetPattern := range v.Targets.Tables {
targetName, err := getTargetName(targetPattern)
if err != nil {
return err
}
found, _, desc, err := resolver.ResolveExisting(
ctx,
targetName.ToUnresolvedObjectName(),
descResolver,
tree.ObjectLookupFlags{},
p.CurrentDatabase(),
p.CurrentSearchPath(),
)
if err != nil {
return err
}
if !found {
return pgerror.Newf(pgcode.InvalidParameterValue, `target %q does not exist`, tree.ErrString(targetPattern))
}
newDescs[desc.GetID()] = tree.NewUnresolvedName(desc.GetName())
}
case *tree.AlterChangefeedDropTarget:
for _, targetPattern := range v.Targets.Tables {
targetName, err := getTargetName(targetPattern)
if err != nil {
return err
}
found, _, desc, err := resolver.ResolveExisting(
ctx,
targetName.ToUnresolvedObjectName(),
descResolver,
tree.ObjectLookupFlags{},
p.CurrentDatabase(),
p.CurrentSearchPath(),
)
if err != nil {
return err
}
if !found {
return pgerror.Newf(pgcode.InvalidParameterValue, `target %q does not exist`, tree.ErrString(targetPattern))
}
delete(newDescs, desc.GetID())
}
case *tree.AlterChangefeedSetOptions:
optsFn, err := p.TypeAsStringOpts(ctx, v.Options, changefeedbase.ChangefeedOptionExpectValues)
if err != nil {
return err
}

for _, targetList := range opts.DropTargets {
descs, err := getTableDescriptors(ctx, p, &targetList, statementTime, initialHighWater)
opts, err := optsFn()
if err != nil {
return err
}
targetDescs = append(targetDescs, descs...)
}

for _, desc := range targetDescs {
if table, isTable := desc.(catalog.TableDescriptor); isTable {
if err := p.CheckPrivilege(ctx, desc, privilege.SELECT); err != nil {
return err
for key, value := range opts {
if _, ok := changefeedbase.ChangefeedOptionExpectValues[key]; !ok {
return pgerror.Newf(pgcode.InvalidParameterValue, `invalid option %q`, key)
}
if _, ok := changefeedbase.AlterChangefeedUnsupportedOptions[key]; ok {
return pgerror.Newf(pgcode.InvalidParameterValue, `cannot alter option %q`, key)
}
delete(details.Tables, table.GetID())
opt := tree.KVOption{Key: tree.Name(key)}
if len(value) > 0 {
opt.Value = tree.NewDString(value)
}
optionsMap[key] = opt
}
}

newTargetSpecifications := make([]jobspb.ChangefeedTargetSpecification, len(details.TargetSpecifications)-len(opts.DropTargets))
for _, ts := range details.TargetSpecifications {
if _, stillThere := details.Tables[ts.TableID]; stillThere {
newTargetSpecifications = append(newTargetSpecifications, ts)
case *tree.AlterChangefeedUnsetOptions:
optKeys := v.Options.ToStrings()
for _, key := range optKeys {
if _, ok := changefeedbase.ChangefeedOptionExpectValues[key]; !ok {
return pgerror.Newf(pgcode.InvalidParameterValue, `invalid option %q`, key)
}
if _, ok := changefeedbase.AlterChangefeedUnsupportedOptions[key]; ok {
return pgerror.Newf(pgcode.InvalidParameterValue, `cannot alter option %q`, key)
}
delete(optionsMap, key)
}
}
details.TargetSpecifications = newTargetSpecifications
}

if len(newDescs) == 0 {
return pgerror.Newf(pgcode.InvalidParameterValue, "cannot drop all targets for changefeed job %d", jobID)
}

if len(details.Tables) == 0 {
return errors.Errorf("cannot drop all targets for changefeed job %d", jobID)
for _, targetName := range newDescs {
newChangefeedStmt.Targets.Tables = append(newChangefeedStmt.Targets.Tables, targetName)
}

if err := validateSink(ctx, p, jobID, details, details.Opts); err != nil {
return err
for _, val := range optionsMap {
newChangefeedStmt.Options = append(newChangefeedStmt.Options, val)
}

oldStmt, err := parser.ParseOne(job.Payload().Description)
sinkURIFn, err := p.TypeAsString(ctx, newChangefeedStmt.SinkURI, `ALTER CHANGEFEED`)
if err != nil {
return err
}
oldChangefeedStmt, ok := oldStmt.AST.(*tree.CreateChangefeed)
if !ok {
return errors.Errorf(`could not parse create changefeed statement for job %d`, jobID)
}

var targets tree.TargetList
for _, target := range details.Tables {
targetName := tree.MakeTableNameFromPrefix(tree.ObjectNamePrefix{}, tree.Name(target.StatementTimeName))
targets.Tables = append(targets.Tables, &targetName)
optsFn, err := p.TypeAsStringOpts(ctx, newChangefeedStmt.Options, changefeedbase.ChangefeedOptionExpectValues)
if err != nil {
return err
}

oldChangefeedStmt.Targets = targets
jobDescription := tree.AsString(oldChangefeedStmt)

newPayload := job.Payload()
newPayload.Description = jobDescription
newPayload.Details = jobspb.WrapPayloadDetails(details)
sinkURI, err := sinkURIFn()
if err != nil {
return err
}

finalDescs, err := getTableDescriptors(ctx, p, &targets, statementTime, initialHighWater)
opts, err := optsFn()
if err != nil {
return err
}

newPayload.DescriptorIDs = func() (sqlDescIDs []descpb.ID) {
for _, desc := range finalDescs {
sqlDescIDs = append(sqlDescIDs, desc.GetID())
}
return sqlDescIDs
}()
jobRecord, err := createChangefeedJobRecord(
ctx,
p,
newChangefeedStmt,
sinkURI,
opts,
jobID,
``,
)
if err != nil {
return errors.Wrap(err, `failed to alter changefeed`)
}

newDetails := jobRecord.Details.(jobspb.ChangefeedDetails)

// We need to persist the statement time that was generated during the
// creation of the changefeed
newDetails.StatementTime = prevDetails.StatementTime

newPayload := job.Payload()
newPayload.Details = jobspb.WrapPayloadDetails(newDetails)
newPayload.Description = jobRecord.Description
newPayload.DescriptorIDs = jobRecord.DescriptorIDs

err = p.ExecCfg().JobRegistry.UpdateJobWithTxn(ctx, jobID, p.ExtendedEvalContext().Txn, lockForUpdate, func(
txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater,
Expand All @@ -203,11 +272,24 @@ func alterChangefeedPlanHook(
return ctx.Err()
case resultsCh <- tree.Datums{
tree.NewDInt(tree.DInt(jobID)),
tree.NewDString(jobDescription),
tree.NewDString(jobRecord.Description),
}:
return nil
}
}

return fn, header, nil, false, nil
}

func getTargetName(targetPattern tree.TablePattern) (*tree.TableName, error) {
pattern, err := targetPattern.NormalizeTablePattern()
if err != nil {
return nil, err
}
targetName, ok := pattern.(*tree.TableName)
if !ok {
return nil, errors.Errorf(`CHANGEFEED cannot target %q`, tree.AsString(targetPattern))
}

return targetName, nil
}
Loading

0 comments on commit 949af29

Please sign in to comment.