Skip to content

Commit

Permalink
Merge #93461
Browse files Browse the repository at this point in the history
93461: descs: fix incorrect namespace shadowing r=postamar a=postamar

Recent work towards #88571 has made the descriptors collection aware of
namespace operations. This awareness erroneously did not extend to
operations involving cached scans of the namespace table.

Fixes #93002.

Release note: None

Co-authored-by: Marius Posta <[email protected]>
  • Loading branch information
craig[bot] and Marius Posta committed Dec 13, 2022
2 parents 99e7bd7 + 5519206 commit 966463a
Show file tree
Hide file tree
Showing 26 changed files with 260 additions and 319 deletions.
2 changes: 1 addition & 1 deletion pkg/sql/alter_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (n *alterSchemaNode) startExec(params runParams) error {

lookupFlags := tree.CommonLookupFlags{Required: true, AvoidLeased: true}
if err := maybeFailOnDependentDescInRename(
params.ctx, params.p, n.db, n.desc.GetName(), lookupFlags, catalog.Schema,
params.ctx, params.p, n.db, n.desc, lookupFlags, catalog.Schema,
); err != nil {
return err
}
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/catalog/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ load("//pkg/testutils/buildutil:buildutil.bzl", "disallowed_imports_test")
go_library(
name = "catalog",
srcs = [
"accessor.go",
"catalog.go",
"descriptor.go",
"descriptor_id_set.go",
Expand Down
55 changes: 0 additions & 55 deletions pkg/sql/catalog/accessor.go

This file was deleted.

98 changes: 55 additions & 43 deletions pkg/sql/catalog/descs/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,6 @@ type Collection struct {
sqlLivenessSession sqlliveness.Session
}

var _ catalog.Accessor = (*Collection)(nil)

// GetDeletedDescs returns the deleted descriptors of the collection.
func (tc *Collection) GetDeletedDescs() catalog.DescriptorIDSet {
return tc.deletedDescs
Expand Down Expand Up @@ -391,6 +389,24 @@ func (tc *Collection) markAsShadowedName(id descpb.ID) {
}] = struct{}{}
}

func (tc *Collection) isShadowedName(nameKey catalog.NameKey) bool {
var k descpb.NameInfo
switch t := nameKey.(type) {
case descpb.NameInfo:
k = t
case *descpb.NameInfo:
k = *t
default:
k = descpb.NameInfo{
ParentID: nameKey.GetParentID(),
ParentSchemaID: nameKey.GetParentSchemaID(),
Name: nameKey.GetName(),
}
}
_, ok := tc.shadowedNames[k]
return ok
}

// WriteCommentToBatch adds the comment changes to uncommitted layer and writes
// to the kv batch.
func (tc *Collection) WriteCommentToBatch(
Expand Down Expand Up @@ -615,7 +631,7 @@ func (tc *Collection) lookupDescriptorID(
return objInMemory.GetID(), nil
}
// Look up ID in storage if nothing was found in memory.
if _, ok := tc.shadowedNames[key]; ok {
if tc.isShadowedName(key) {
return descpb.InvalidID, nil
}
read, err := tc.cr.GetByNames(ctx, txn, []descpb.NameInfo{key})
Expand Down Expand Up @@ -692,7 +708,7 @@ func (tc *Collection) GetAllDescriptorsForDatabase(
var ids catalog.DescriptorIDSet
ids.Add(db.GetID())
_ = read.ForEachNamespaceEntry(func(e nstree.NamespaceEntry) error {
if e.GetParentID() != db.GetID() {
if e.GetParentID() != db.GetID() || tc.isShadowedName(e) {
return nil
}
if !strings.HasPrefix(e.GetName(), catconstants.PgTempSchemaName) &&
Expand Down Expand Up @@ -794,7 +810,9 @@ func (tc *Collection) GetAllDatabaseDescriptors(
}
var readIDs catalog.DescriptorIDSet
_ = read.ForEachDatabaseNamespaceEntry(func(e nstree.NamespaceEntry) error {
readIDs.Add(e.GetID())
if !tc.isShadowedName(e) {
readIDs.Add(e.GetID())
}
return nil
})
const isDescriptorRequired = true
Expand Down Expand Up @@ -889,7 +907,9 @@ func (tc *Collection) GetSchemasForDatabase(
ret[keys.PublicSchemaIDForBackup] = catconstants.PublicSchemaName
}
_ = read.ForEachSchemaNamespaceEntryInDatabase(db.GetID(), func(e nstree.NamespaceEntry) error {
ret[e.GetID()] = e.GetName()
if !tc.isShadowedName(e) {
ret[e.GetID()] = e.GetName()
}
return nil
})
_ = tc.uncommitted.iterateUncommittedByID(func(desc catalog.Descriptor) error {
Expand All @@ -907,51 +927,43 @@ func (tc *Collection) GetSchemasForDatabase(
return ret, nil
}

// GetObjectNamesAndIDs returns the names and IDs of all objects in a database and schema.
// GetObjectNamesAndIDs returns the names and IDs of all objects in a schema.
func (tc *Collection) GetObjectNamesAndIDs(
ctx context.Context,
txn *kv.Txn,
db catalog.DatabaseDescriptor,
scName string,
flags tree.DatabaseListFlags,
) (tree.TableNames, descpb.IDs, error) {
if ok, names, ds := tc.virtual.maybeGetObjectNamesAndIDs(
scName, db, flags,
); ok {
return names, ds, nil
}

schemaFlags := tree.SchemaLookupFlags{
Required: flags.Required,
AvoidLeased: flags.RequireMutable || flags.AvoidLeased,
IncludeDropped: flags.IncludeDropped,
IncludeOffline: flags.IncludeOffline,
}
schema, err := tc.getSchemaByName(ctx, txn, db, scName, schemaFlags)
if err != nil {
return nil, nil, err
}
if schema == nil { // required must have been false
return nil, nil, nil
ctx context.Context, txn *kv.Txn, db catalog.DatabaseDescriptor, sc catalog.SchemaDescriptor,
) (nstree.Catalog, error) {
var mc nstree.MutableCatalog
if sc.SchemaKind() == catalog.SchemaVirtual {
tc.virtual.forEachVirtualObject(sc, func(obj catalog.Descriptor) {
mc.UpsertNamespaceEntry(obj, obj.GetID(), hlc.Timestamp{})
})
return mc.Catalog, nil
}
read, err := tc.cr.ScanNamespaceForSchemaObjects(ctx, txn, db, schema)
read, err := tc.cr.ScanNamespaceForSchemaObjects(ctx, txn, db, sc)
if err != nil {
return nil, nil, err
return nstree.Catalog{}, err
}
var tableNames tree.TableNames
var tableIDs descpb.IDs
_ = read.ForEachNamespaceEntry(func(e nstree.NamespaceEntry) error {
if e.GetParentID() != db.GetID() || e.GetParentSchemaID() != schema.GetID() {
return nil
if !tc.isShadowedName(e) {
mc.UpsertNamespaceEntry(e, e.GetID(), e.GetMVCCTimestamp())
}
tn := tree.MakeTableNameWithSchema(tree.Name(db.GetName()), tree.Name(scName), tree.Name(e.GetName()))
tn.ExplicitCatalog = flags.ExplicitPrefix
tn.ExplicitSchema = flags.ExplicitPrefix
tableNames = append(tableNames, tn)
tableIDs = append(tableIDs, e.GetID())
return nil
})
return tableNames, tableIDs, nil
for _, iterator := range []func(func(catalog.Descriptor) error) error{
tc.uncommitted.iterateUncommittedByID,
tc.synthetic.iterateSyntheticByID,
} {
_ = iterator(func(desc catalog.Descriptor) error {
if desc.GetParentID() != db.GetID() || desc.GetParentSchemaID() != sc.GetID() {
return nil
}
if desc.Dropped() || desc.SkipNamespace() {
return nil
}
mc.UpsertNamespaceEntry(desc, desc.GetID(), desc.GetModificationTime())
return nil
})
}
return mc.Catalog, nil
}

// SetSyntheticDescriptors sets the provided descriptors as the synthetic
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/catalog/descs/descriptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ func (tc *Collection) getNonVirtualDescriptorID(
}
lookupStoreCacheID := func() (continueOrHalt, descpb.ID, error) {
ni := descpb.NameInfo{ParentID: parentID, ParentSchemaID: parentSchemaID, Name: name}
if _, ok := tc.shadowedNames[ni]; ok {
if tc.isShadowedName(ni) {
return continueLookups, descpb.InvalidID, nil
}
if tc.cr.IsNameInCache(&ni) {
Expand Down Expand Up @@ -645,7 +645,7 @@ func (tc *Collection) getNonVirtualDescriptorID(
return haltLookups, descpb.InvalidID, nil
}
ni := descpb.NameInfo{ParentID: parentID, ParentSchemaID: parentSchemaID, Name: name}
if _, ok := tc.shadowedNames[ni]; ok {
if tc.isShadowedName(ni) {
return haltLookups, descpb.InvalidID, nil
}
read, err := tc.cr.GetByNames(ctx, txn, []descpb.NameInfo{ni})
Expand Down
26 changes: 7 additions & 19 deletions pkg/sql/catalog/descs/virtual_descriptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,26 +99,14 @@ func (tc virtualDescriptors) getSchemaByID(
}
}

func (tc virtualDescriptors) maybeGetObjectNamesAndIDs(
scName string, dbDesc catalog.DatabaseDescriptor, flags tree.DatabaseListFlags,
) (isVirtual bool, _ tree.TableNames, _ descpb.IDs) {
if tc.vs == nil {
return false, nil, nil
}
entry, ok := tc.vs.GetVirtualSchema(scName)
func (tc virtualDescriptors) forEachVirtualObject(
sc catalog.SchemaDescriptor, fn func(obj catalog.Descriptor),
) {
scEntry, ok := tc.vs.GetVirtualSchemaByID(sc.GetID())
if !ok {
return false, nil, nil
return
}
names := make(tree.TableNames, 0, entry.NumTables())
IDs := make(descpb.IDs, 0, entry.NumTables())
schemaDesc := entry.Desc()
entry.VisitTables(func(table catalog.VirtualObject) {
name := tree.MakeTableNameWithSchema(
tree.Name(dbDesc.GetName()), tree.Name(schemaDesc.GetName()), tree.Name(table.Desc().GetName()))
name.ExplicitCatalog = flags.ExplicitPrefix
name.ExplicitSchema = flags.ExplicitPrefix
names = append(names, name)
IDs = append(IDs, table.Desc().GetID())
scEntry.VisitTables(func(object catalog.VirtualObject) {
fn(object.Desc())
})
return true, names, IDs
}
45 changes: 14 additions & 31 deletions pkg/sql/catalog/resolver/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,18 @@ type SchemaResolver interface {
tree.TypeReferenceResolver
tree.FunctionReferenceResolver

// Accessor is a crufty name and interface that wraps the *descs.Collection.
Accessor() catalog.Accessor
// GetObjectNamesAndIDs returns the names and IDs of the objects in the
// schema.
GetObjectNamesAndIDs(
ctx context.Context, db catalog.DatabaseDescriptor, sc catalog.SchemaDescriptor,
) (tree.TableNames, descpb.IDs, error)

// MustGetCurrentSessionDatabase returns the database descriptor for the
// current session database.
MustGetCurrentSessionDatabase(ctx context.Context) (catalog.DatabaseDescriptor, error)

// CurrentSearchPath returns the current search path.
CurrentSearchPath() sessiondata.SearchPath
CommonLookupFlagsRequired() tree.CommonLookupFlags
}

// ObjectNameExistingResolver is the helper interface to resolve table
Expand Down Expand Up @@ -85,25 +93,6 @@ type ObjectNameTargetResolver interface {
var ErrNoPrimaryKey = pgerror.Newf(pgcode.NoPrimaryKey,
"requested table does not have a primary key")

// GetObjectNamesAndIDs retrieves the names and IDs of all objects in the
// target database/schema. If explicitPrefix is set, the returned
// table names will have an explicit schema and catalog name.
func GetObjectNamesAndIDs(
ctx context.Context,
txn *kv.Txn,
sr SchemaResolver,
codec keys.SQLCodec,
dbDesc catalog.DatabaseDescriptor,
scName string,
explicitPrefix bool,
) (tree.TableNames, descpb.IDs, error) {
flags := tree.DatabaseListFlags{
CommonLookupFlags: sr.CommonLookupFlagsRequired(),
ExplicitPrefix: explicitPrefix,
}
return sr.Accessor().GetObjectNamesAndIDs(ctx, txn, dbDesc, scName, flags)
}

// ResolveExistingTableObject looks up an existing object.
// If required is true, an error is returned if the object does not exist.
// Optionally, if a desired descriptor type is specified, that type is checked.
Expand Down Expand Up @@ -556,8 +545,6 @@ func ResolveIndex(
ctx context.Context,
schemaResolver SchemaResolver,
tableIndexName *tree.TableIndexName,
txn *kv.Txn,
codec keys.SQLCodec,
required bool,
requireActiveIndex bool,
) (
Expand Down Expand Up @@ -624,7 +611,7 @@ func ResolveIndex(
}

tblFound, tbl, idx, err := findTableContainingIndex(
ctx, tree.Name(tableIndexName.Index), resolvedPrefix, txn, codec, schemaResolver, requireActiveIndex,
ctx, tree.Name(tableIndexName.Index), resolvedPrefix, schemaResolver, requireActiveIndex,
)
if err != nil {
return false, catalog.ResolvedObjectPrefix{}, nil, nil, err
Expand Down Expand Up @@ -667,7 +654,7 @@ func ResolveIndex(
schemaFound = true

candidateFound, tbl, idx, curErr := findTableContainingIndex(
ctx, tree.Name(tableIndexName.Index), candidateResolvedPrefix, txn, codec, schemaResolver, requireActiveIndex,
ctx, tree.Name(tableIndexName.Index), candidateResolvedPrefix, schemaResolver, requireActiveIndex,
)
if curErr != nil {
return false, catalog.ResolvedObjectPrefix{}, nil, nil, curErr
Expand Down Expand Up @@ -777,14 +764,10 @@ func findTableContainingIndex(
ctx context.Context,
indexName tree.Name,
resolvedPrefix catalog.ResolvedObjectPrefix,
txn *kv.Txn,
codec keys.SQLCodec,
schemaResolver SchemaResolver,
requireActiveIndex bool,
) (found bool, tblDesc catalog.TableDescriptor, idxDesc catalog.Index, err error) {
dsNames, _, err := GetObjectNamesAndIDs(
ctx, txn, schemaResolver, codec, resolvedPrefix.Database, resolvedPrefix.Schema.GetName(), true,
)
dsNames, _, err := schemaResolver.GetObjectNamesAndIDs(ctx, resolvedPrefix.Database, resolvedPrefix.Schema)

if err != nil {
return false, nil, nil, err
Expand Down
Loading

0 comments on commit 966463a

Please sign in to comment.