Skip to content

Commit

Permalink
changefeedccl, backupresolver: refactor to hold on to mapping of targ…
Browse files Browse the repository at this point in the history
…et to descriptor

Changefeed statements need to resolve a bunch of table names at once,
 but unlike backups and grants they need to know which returned
descriptor corresponded to which input because they (now) take
target-specific options. We were reconstructing this awkwardly on
the calling side. This PR adds an optional parameter to the
 backupresolver method being used so that it can track which
 descriptor belongs to which input.

I'm probably being overly polite by making this optional,
but hey, it is a little extra memory footprint and not my package.

Release note: None
  • Loading branch information
HonoreDB committed Apr 2, 2022
1 parent 0d10296 commit 5a4b7fc
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 81 deletions.
37 changes: 35 additions & 2 deletions pkg/ccl/backupccl/backupresolver/targets.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func NewDescriptorResolver(descs []catalog.Descriptor) (*DescriptorResolver, err
// session database) or if one of its tables matches the targets. All expanded
// DBs, via either `foo.*` or `DATABASE foo` are noted, as are those explicitly
// named as DBs (e.g. with `DATABASE foo`, not `foo.*`). These distinctions are
// used e.g. by RESTORE.
// used e.g. by RESTORE. tablePatternMap will be populated if non-nil.
//
// This is guaranteed to not return duplicates.
func DescriptorsMatchingTargets(
Expand All @@ -316,6 +316,7 @@ func DescriptorsMatchingTargets(
descriptors []catalog.Descriptor,
targets tree.TargetList,
asOf hlc.Timestamp,
tablePatternMap map[tree.TablePattern]catalog.Descriptor,
) (DescriptorsMatched, error) {
ret := DescriptorsMatched{}

Expand Down Expand Up @@ -420,6 +421,7 @@ func DescriptorsMatchingTargets(
alreadyRequestedSchemasByDBs := make(map[descpb.ID]map[string]struct{})
for _, pattern := range targets.Tables {
var err error
origPat := pattern
pattern, err = pattern.NormalizeTablePattern()
if err != nil {
return ret, err
Expand Down Expand Up @@ -459,6 +461,10 @@ func DescriptorsMatchingTargets(
return ret, doesNotExistErr
}

if tablePatternMap != nil {
tablePatternMap[origPat] = descI
}

// If the parent database is not requested already, request it now.
parentID := tableDesc.GetParentID()
if _, ok := alreadyRequestedDBs[parentID]; !ok {
Expand Down Expand Up @@ -536,6 +542,13 @@ func DescriptorsMatchingTargets(
}
scMap := alreadyRequestedSchemasByDBs[dbID]
scMap[p.Schema()] = struct{}{}
if tablePatternMap != nil {
tablePatternMap[origPat] = prefix.Schema
}
} else {
if tablePatternMap != nil {
tablePatternMap[origPat] = prefix.Database
}
}
default:
return ret, errors.Errorf("unknown pattern %T: %+v", pattern, pattern)
Expand Down Expand Up @@ -645,7 +658,7 @@ func ResolveTargetsToDescriptors(

var matched DescriptorsMatched
if matched, err = DescriptorsMatchingTargets(ctx,
p.CurrentDatabase(), p.CurrentSearchPath(), allDescs, *targets, endTime); err != nil {
p.CurrentDatabase(), p.CurrentSearchPath(), allDescs, *targets, endTime, nil /*tablePatternMap*/); err != nil {
return nil, nil, err
}

Expand All @@ -657,3 +670,23 @@ func ResolveTargetsToDescriptors(

return matched.Descs, matched.ExpandedDB, nil
}

// MapTablePatternsToDescriptors performs the same name resolution as ResolveTargetsToDescriptors, but returns
// in a different format, a map of TablePattern -> Descriptor.
func MapTablePatternsToDescriptors(
ctx context.Context, p sql.PlanHookState, endTime hlc.Timestamp, tablePatterns tree.TablePatterns,
) (map[tree.TablePattern]catalog.Descriptor, error) {
allDescs, err := LoadAllDescs(ctx, p.ExecCfg(), endTime)
if err != nil {
return nil, err
}

targets := tree.TargetList{Tables: tablePatterns}

tablePatternMap := make(map[tree.TablePattern]catalog.Descriptor)
_, err = DescriptorsMatchingTargets(ctx, p.CurrentDatabase(), p.CurrentSearchPath(), allDescs, targets, endTime, tablePatternMap)
if err != nil {
return nil, err
}
return tablePatternMap, nil
}
10 changes: 9 additions & 1 deletion pkg/ccl/backupccl/backupresolver/targets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,10 @@ func TestDescriptorsMatchingTargets(t *testing.T) {
}
targets := stmt.AST.(*tree.Grant).Targets

tablePatternMap := make(map[tree.TablePattern]catalog.Descriptor)

matched, err := DescriptorsMatchingTargets(context.Background(),
test.sessionDatabase, searchPath, descriptors, targets, hlc.Timestamp{})
test.sessionDatabase, searchPath, descriptors, targets, hlc.Timestamp{}, tablePatternMap)
if test.err != "" {
if !testutils.IsError(err, test.err) {
t.Fatalf("expected error matching '%v', but got '%v'", test.err, err)
Expand All @@ -282,6 +284,12 @@ func TestDescriptorsMatchingTargets(t *testing.T) {
if !reflect.DeepEqual(test.expectedDBs, matchedDBNames) {
t.Fatalf("expected %q got %q", test.expectedDBs, matchedDBNames)
}
for _, p := range targets.Tables {
_, ok := tablePatternMap[p]
if !ok {
t.Fatalf("no entry in %q for %q", tablePatternMap, p)
}
}
}
})
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/targets.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ func selectTargets(
}

matched, err := backupresolver.DescriptorsMatchingTargets(ctx,
p.CurrentDatabase(), p.CurrentSearchPath(), allDescs, targets, asOf)
p.CurrentDatabase(), p.CurrentSearchPath(), allDescs, targets, asOf, nil)
if err != nil {
return nil, nil, nil, err
}
Expand Down
12 changes: 4 additions & 8 deletions pkg/ccl/changefeedccl/alter_changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,16 +639,12 @@ func fetchSpansForDescs(
statementTime hlc.Timestamp,
descs []catalog.Descriptor,
) ([]roachpb.Span, error) {
_, tables, err := getTargetsAndTables(ctx, p, descs, tree.ChangefeedTargets{}, opts)
if err != nil {
return nil, err
}

details := jobspb.ChangefeedDetails{
Tables: tables,
targets := make([]jobspb.ChangefeedTargetSpecification, len(descs))
for i, d := range descs {
targets[i] = jobspb.ChangefeedTargetSpecification{TableID: d.GetID()}
}

spans, err := fetchSpansForTargets(ctx, p.ExecCfg(), AllTargets(details), statementTime)
spans, err := fetchSpansForTargets(ctx, p.ExecCfg(), targets, statementTime)

return spans, err
}
85 changes: 16 additions & 69 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,10 +299,13 @@ func createChangefeedJobRecord(
endTime = statementTime
}

targetList := uniqueTableNames(changefeedStmt.Targets)
tableOnlyTargetList := tree.TargetList{}
for _, t := range changefeedStmt.Targets {
tableOnlyTargetList.Tables = append(tableOnlyTargetList.Tables, t.TableName)
}

// This grabs table descriptors once to get their ids.
targetDescs, err := getTableDescriptors(ctx, p, &targetList, statementTime, initialHighWater)
targetDescs, err := getTableDescriptors(ctx, p, &tableOnlyTargetList, statementTime, initialHighWater)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -499,7 +502,7 @@ func getTableDescriptors(
targets *tree.TargetList,
statementTime hlc.Timestamp,
initialHighWater hlc.Timestamp,
) ([]catalog.Descriptor, error) {
) (map[tree.TablePattern]catalog.Descriptor, error) {
// For now, disallow targeting a database or wildcard table selection.
// Getting it right as tables enter and leave the set over time is
// tricky.
Expand All @@ -517,9 +520,8 @@ func getTableDescriptors(
}
}

// This grabs table descriptors once to get their ids.
targetDescs, _, err := backupresolver.ResolveTargetsToDescriptors(
ctx, p, statementTime, targets)
targetDescs, err := backupresolver.MapTablePatternsToDescriptors(
ctx, p, statementTime, targets.Tables)
if err != nil {
var m *backupresolver.MissingTableErr
if errors.As(err, &m) {
Expand All @@ -540,7 +542,7 @@ func getTableDescriptors(
func getTargetsAndTables(
ctx context.Context,
p sql.PlanHookState,
targetDescs []catalog.Descriptor,
targetDescs map[tree.TablePattern]catalog.Descriptor,
rawTargets tree.ChangefeedTargets,
opts map[string]string,
) ([]jobspb.ChangefeedTargetSpecification, jobspb.ChangefeedTargets, error) {
Expand All @@ -563,9 +565,13 @@ func getTargetsAndTables(
}
}
for i, ct := range rawTargets {
td, err := matchDescriptorToTablePattern(ctx, p, targetDescs, ct.TableName)
if err != nil {
return nil, nil, err
desc, ok := targetDescs[ct.TableName]
if !ok {
return nil, nil, errors.Newf("could not match %v to a fetched descriptor. fetched were %v", ct.TableName, targetDescs)
}
td, ok := desc.(catalog.TableDescriptor)
if !ok {
return nil, nil, errors.Errorf(`CHANGEFEED cannot target %s`, tree.AsString(&ct))
}
typ := jobspb.ChangefeedTargetSpecification_PRIMARY_FAMILY_ONLY
if ct.FamilyName != "" {
Expand All @@ -585,49 +591,6 @@ func getTargetsAndTables(
return targets, tables, nil
}

// TODO (zinger): This is redoing work already done in backupresolver. Have backupresolver
// return a map of descriptors to table patterns so this isn't necessary and is less fragile.
func matchDescriptorToTablePattern(
ctx context.Context, p sql.PlanHookState, descs []catalog.Descriptor, t tree.TablePattern,
) (catalog.TableDescriptor, error) {
pattern, err := t.NormalizeTablePattern()
if err != nil {
return nil, err
}
name, ok := pattern.(*tree.TableName)
if !ok {
return nil, errors.Newf("%v is not a TableName", pattern)
}
for _, desc := range descs {
tbl, ok := desc.(catalog.TableDescriptor)
if !ok {
continue
}
if tbl.GetName() != string(name.ObjectName) {
continue
}
qtn, err := getQualifiedTableNameObj(ctx, p.ExecCfg(), p.ExtendedEvalContext().Txn, tbl)
if err != nil {
return nil, err
}
switch name.ToUnresolvedObjectName().NumParts {
case 1:
if qtn.CatalogName == tree.Name(p.CurrentDatabase()) {
return tbl, nil
}
case 2:
if qtn.CatalogName == name.SchemaName {
return tbl, nil
}
case 3:
if qtn.CatalogName == name.CatalogName && qtn.SchemaName == name.SchemaName {
return tbl, nil
}
}
}
return nil, errors.Newf("could not match %v to a fetched descriptor", t)
}

func validateSink(
ctx context.Context,
p sql.PlanHookState,
Expand Down Expand Up @@ -1153,19 +1116,3 @@ func AllTargets(cd jobspb.ChangefeedDetails) (targets []jobspb.ChangefeedTargetS
}
return
}

// uniqueTableNames creates a TargetList whose Tables are
// the table names in cts, removing duplicates.
func uniqueTableNames(cts tree.ChangefeedTargets) tree.TargetList {
uniqueTablePatterns := make(map[string]tree.TablePattern)
for _, t := range cts {
uniqueTablePatterns[t.TableName.String()] = t.TableName
}

targetList := tree.TargetList{}
for _, t := range uniqueTablePatterns {
targetList.Tables = append(targetList.Tables, t)
}

return targetList
}

0 comments on commit 5a4b7fc

Please sign in to comment.