Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl, backupresolver: refactor to hold on to mapping of target to descriptor #79260

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,7 @@ func backupPlanHook(
switch backupStmt.Coverage() {
case tree.RequestedDescriptors:
var err error
targetDescs, completeDBs, err = backupresolver.ResolveTargetsToDescriptors(ctx, p, endTime, backupStmt.Targets)
targetDescs, completeDBs, _, err = backupresolver.ResolveTargetsToDescriptors(ctx, p, endTime, backupStmt.Targets)
if err != nil {
return errors.Wrap(err, "failed to resolve targets specified in the BACKUP stmt")
}
Expand Down
24 changes: 18 additions & 6 deletions pkg/ccl/backupccl/backupresolver/targets.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type DescriptorsMatched struct {

// Explicitly requested DBs (e.g. DATABASE a).
RequestedDBs []catalog.DatabaseDescriptor

// A map of explicitly requested TablePatterns to their resolutions.
DescsByTablePattern map[tree.TablePattern]catalog.Descriptor
}

// MissingTableErr is a custom error type for Missing Table when resolver.ResolveExisting()
Expand Down Expand Up @@ -308,7 +311,8 @@ func NewDescriptorResolver(descs []catalog.Descriptor) (*DescriptorResolver, err
// named as DBs (e.g. with `DATABASE foo`, not `foo.*`). These distinctions are
// used e.g. by RESTORE.
//
// This is guaranteed to not return duplicates.
// This is guaranteed to not return duplicates, other than in DescsByTablePattern,
// which will contain a descriptor for every element of targets.Tables.
func DescriptorsMatchingTargets(
ctx context.Context,
currentDatabase string,
Expand All @@ -317,7 +321,9 @@ func DescriptorsMatchingTargets(
targets tree.TargetList,
asOf hlc.Timestamp,
) (DescriptorsMatched, error) {
ret := DescriptorsMatched{}
ret := DescriptorsMatched{
DescsByTablePattern: make(map[tree.TablePattern]catalog.Descriptor, len(targets.Tables)),
}

r, err := NewDescriptorResolver(descriptors)
if err != nil {
Expand Down Expand Up @@ -420,6 +426,7 @@ func DescriptorsMatchingTargets(
alreadyRequestedSchemasByDBs := make(map[descpb.ID]map[string]struct{})
for _, pattern := range targets.Tables {
var err error
origPat := pattern
pattern, err = pattern.NormalizeTablePattern()
if err != nil {
return ret, err
Expand Down Expand Up @@ -459,6 +466,8 @@ func DescriptorsMatchingTargets(
return ret, doesNotExistErr
}

ret.DescsByTablePattern[origPat] = descI

// If the parent database is not requested already, request it now.
parentID := tableDesc.GetParentID()
if _, ok := alreadyRequestedDBs[parentID]; !ok {
Expand Down Expand Up @@ -536,6 +545,9 @@ func DescriptorsMatchingTargets(
}
scMap := alreadyRequestedSchemasByDBs[dbID]
scMap[p.Schema()] = struct{}{}
ret.DescsByTablePattern[origPat] = prefix.Schema
} else {
ret.DescsByTablePattern[origPat] = prefix.Database
}
default:
return ret, errors.Errorf("unknown pattern %T: %+v", pattern, pattern)
Expand Down Expand Up @@ -637,16 +649,16 @@ func LoadAllDescs(
// TODO(ajwerner): adopt the collection here.
func ResolveTargetsToDescriptors(
ctx context.Context, p sql.PlanHookState, endTime hlc.Timestamp, targets *tree.TargetList,
) ([]catalog.Descriptor, []descpb.ID, error) {
) ([]catalog.Descriptor, []descpb.ID, map[tree.TablePattern]catalog.Descriptor, error) {
allDescs, err := LoadAllDescs(ctx, p.ExecCfg(), endTime)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}

var matched DescriptorsMatched
if matched, err = DescriptorsMatchingTargets(ctx,
p.CurrentDatabase(), p.CurrentSearchPath(), allDescs, *targets, endTime); err != nil {
return nil, nil, err
return nil, nil, nil, err
}

// This sorting was originally required to support interleaves.
Expand All @@ -655,5 +667,5 @@ func ResolveTargetsToDescriptors(
// that certain tests rely on.
sort.Slice(matched.Descs, func(i, j int) bool { return matched.Descs[i].GetID() < matched.Descs[j].GetID() })

return matched.Descs, matched.ExpandedDB, nil
return matched.Descs, matched.ExpandedDB, matched.DescsByTablePattern, nil
}
6 changes: 6 additions & 0 deletions pkg/ccl/backupccl/backupresolver/targets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,12 @@ func TestDescriptorsMatchingTargets(t *testing.T) {
if !reflect.DeepEqual(test.expectedDBs, matchedDBNames) {
t.Fatalf("expected %q got %q", test.expectedDBs, matchedDBNames)
}
for _, p := range targets.Tables {
_, ok := matched.DescsByTablePattern[p]
if !ok {
t.Fatalf("no entry in %q for %q", matched.DescsByTablePattern, p)
}
}
}
})
}
Expand Down
12 changes: 4 additions & 8 deletions pkg/ccl/changefeedccl/alter_changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,16 +639,12 @@ func fetchSpansForDescs(
statementTime hlc.Timestamp,
descs []catalog.Descriptor,
) ([]roachpb.Span, error) {
_, tables, err := getTargetsAndTables(ctx, p, descs, tree.ChangefeedTargets{}, opts)
if err != nil {
return nil, err
}

details := jobspb.ChangefeedDetails{
Tables: tables,
targets := make([]jobspb.ChangefeedTargetSpecification, len(descs))
for i, d := range descs {
targets[i] = jobspb.ChangefeedTargetSpecification{TableID: d.GetID()}
}

spans, err := fetchSpansForTargets(ctx, p.ExecCfg(), AllTargets(details), statementTime)
spans, err := fetchSpansForTargets(ctx, p.ExecCfg(), targets, statementTime)

return spans, err
}
84 changes: 15 additions & 69 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,10 +299,13 @@ func createChangefeedJobRecord(
endTime = statementTime
}

targetList := uniqueTableNames(changefeedStmt.Targets)
tableOnlyTargetList := tree.TargetList{}
for _, t := range changefeedStmt.Targets {
tableOnlyTargetList.Tables = append(tableOnlyTargetList.Tables, t.TableName)
}

// This grabs table descriptors once to get their ids.
targetDescs, err := getTableDescriptors(ctx, p, &targetList, statementTime, initialHighWater)
targetDescs, err := getTableDescriptors(ctx, p, &tableOnlyTargetList, statementTime, initialHighWater)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -499,7 +502,7 @@ func getTableDescriptors(
targets *tree.TargetList,
statementTime hlc.Timestamp,
initialHighWater hlc.Timestamp,
) ([]catalog.Descriptor, error) {
) (map[tree.TablePattern]catalog.Descriptor, error) {
// For now, disallow targeting a database or wildcard table selection.
// Getting it right as tables enter and leave the set over time is
// tricky.
Expand All @@ -517,9 +520,7 @@ func getTableDescriptors(
}
}

// This grabs table descriptors once to get their ids.
targetDescs, _, err := backupresolver.ResolveTargetsToDescriptors(
ctx, p, statementTime, targets)
_, _, targetDescs, err := backupresolver.ResolveTargetsToDescriptors(ctx, p, statementTime, targets)
if err != nil {
var m *backupresolver.MissingTableErr
if errors.As(err, &m) {
Expand All @@ -540,7 +541,7 @@ func getTableDescriptors(
func getTargetsAndTables(
ctx context.Context,
p sql.PlanHookState,
targetDescs []catalog.Descriptor,
targetDescs map[tree.TablePattern]catalog.Descriptor,
rawTargets tree.ChangefeedTargets,
opts map[string]string,
) ([]jobspb.ChangefeedTargetSpecification, jobspb.ChangefeedTargets, error) {
Expand All @@ -563,9 +564,13 @@ func getTargetsAndTables(
}
}
for i, ct := range rawTargets {
td, err := matchDescriptorToTablePattern(ctx, p, targetDescs, ct.TableName)
if err != nil {
return nil, nil, err
desc, ok := targetDescs[ct.TableName]
if !ok {
return nil, nil, errors.Newf("could not match %v to a fetched descriptor. fetched were %v", ct.TableName, targetDescs)
}
td, ok := desc.(catalog.TableDescriptor)
if !ok {
return nil, nil, errors.Errorf(`CHANGEFEED cannot target %s`, tree.AsString(&ct))
}
typ := jobspb.ChangefeedTargetSpecification_PRIMARY_FAMILY_ONLY
if ct.FamilyName != "" {
Expand All @@ -585,49 +590,6 @@ func getTargetsAndTables(
return targets, tables, nil
}

// TODO (zinger): This is redoing work already done in backupresolver. Have backupresolver
// return a map of descriptors to table patterns so this isn't necessary and is less fragile.
func matchDescriptorToTablePattern(
ctx context.Context, p sql.PlanHookState, descs []catalog.Descriptor, t tree.TablePattern,
) (catalog.TableDescriptor, error) {
pattern, err := t.NormalizeTablePattern()
if err != nil {
return nil, err
}
name, ok := pattern.(*tree.TableName)
if !ok {
return nil, errors.Newf("%v is not a TableName", pattern)
}
for _, desc := range descs {
tbl, ok := desc.(catalog.TableDescriptor)
if !ok {
continue
}
if tbl.GetName() != string(name.ObjectName) {
continue
}
qtn, err := getQualifiedTableNameObj(ctx, p.ExecCfg(), p.ExtendedEvalContext().Txn, tbl)
if err != nil {
return nil, err
}
switch name.ToUnresolvedObjectName().NumParts {
case 1:
if qtn.CatalogName == tree.Name(p.CurrentDatabase()) {
return tbl, nil
}
case 2:
if qtn.CatalogName == name.SchemaName {
return tbl, nil
}
case 3:
if qtn.CatalogName == name.CatalogName && qtn.SchemaName == name.SchemaName {
return tbl, nil
}
}
}
return nil, errors.Newf("could not match %v to a fetched descriptor", t)
}

func validateSink(
ctx context.Context,
p sql.PlanHookState,
Expand Down Expand Up @@ -1153,19 +1115,3 @@ func AllTargets(cd jobspb.ChangefeedDetails) (targets []jobspb.ChangefeedTargetS
}
return
}

// uniqueTableNames creates a TargetList whose Tables are
// the table names in cts, removing duplicates.
func uniqueTableNames(cts tree.ChangefeedTargets) tree.TargetList {
uniqueTablePatterns := make(map[string]tree.TablePattern)
for _, t := range cts {
uniqueTablePatterns[t.TableName.String()] = t.TableName
}

targetList := tree.TargetList{}
for _, t := range uniqueTablePatterns {
targetList.Tables = append(targetList.Tables, t)
}

return targetList
}