Skip to content

Commit

Permalink
Merge #93611
Browse files Browse the repository at this point in the history
93611: catkv,*: remove catkv.Direct interface r=postamar a=postamar

Informs #64089.

Release note: None

Co-authored-by: Marius Posta <[email protected]>
  • Loading branch information
craig[bot] and Marius Posta committed Dec 15, 2022
2 parents 366be3f + f62d2aa commit 53b1dea
Show file tree
Hide file tree
Showing 50 changed files with 527 additions and 602 deletions.
33 changes: 25 additions & 8 deletions pkg/ccl/backupccl/restore_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func allocateDescriptorRewrites(
}

// See if there is an existing schema with the same name.
id, err := col.Direct().LookupSchemaID(ctx, txn, parentID, sc.Name)
id, err := col.LookupSchemaID(ctx, txn, parentID, sc.Name)
if err != nil {
return err
}
Expand All @@ -335,7 +335,11 @@ func allocateDescriptorRewrites(
} else {
// If we found an existing schema, then we need to remap all references
// to this schema to the existing one.
desc, err := col.Direct().MustGetSchemaDescByID(ctx, txn, id)
desc, err := col.GetImmutableSchemaByID(ctx, txn, id, tree.SchemaLookupFlags{
AvoidLeased: true,
IncludeDropped: true,
IncludeOffline: true,
})
if err != nil {
return err
}
Expand Down Expand Up @@ -378,13 +382,17 @@ func allocateDescriptorRewrites(
// Check that the table name is _not_ in use.
// This would fail the CPut later anyway, but this yields a prettier error.
tableName := tree.NewUnqualifiedTableName(tree.Name(table.GetName()))
err := col.Direct().CheckObjectCollision(ctx, txn, parentID, table.GetParentSchemaID(), tableName)
err := descs.CheckObjectNameCollision(ctx, col, txn, parentID, table.GetParentSchemaID(), tableName)
if err != nil {
return err
}

// Check privileges.
parentDB, err := col.Direct().MustGetDatabaseDescByID(ctx, txn, parentID)
_, parentDB, err := col.GetImmutableDatabaseByID(ctx, txn, parentID, tree.DatabaseLookupFlags{
AvoidLeased: true,
IncludeDropped: true,
IncludeOffline: true,
})
if err != nil {
return errors.Wrapf(err,
"failed to lookup parent DB %d", errors.Safe(parentID))
Expand Down Expand Up @@ -449,7 +457,11 @@ func allocateDescriptorRewrites(
targetDB, typ.Name)
}
// Check privileges on the parent DB.
parentDB, err := col.Direct().MustGetDatabaseDescByID(ctx, txn, parentID)
_, parentDB, err := col.GetImmutableDatabaseByID(ctx, txn, parentID, tree.DatabaseLookupFlags{
AvoidLeased: true,
IncludeDropped: true,
IncludeOffline: true,
})
if err != nil {
return errors.Wrapf(err,
"failed to lookup parent DB %d", errors.Safe(parentID))
Expand All @@ -464,8 +476,9 @@ func allocateDescriptorRewrites(
}
return
}
desc, err := col.Direct().GetDescriptorCollidingWithObject(
desc, err := descs.GetDescriptorCollidingWithObjectName(
ctx,
col,
txn,
parentID,
getParentSchemaID(typ),
Expand All @@ -492,7 +505,7 @@ func allocateDescriptorRewrites(
// Ensure that there isn't a collision with the array type name.
arrTyp := typesByID[typ.ArrayTypeID]
typeName := tree.NewUnqualifiedTypeName(arrTyp.GetName())
err = col.Direct().CheckObjectCollision(ctx, txn, parentID, getParentSchemaID(typ), typeName)
err = descs.CheckObjectNameCollision(ctx, col, txn, parentID, getParentSchemaID(typ), typeName)
if err != nil {
return errors.Wrapf(err, "name collision for %q's array type", typ.Name)
}
Expand Down Expand Up @@ -696,7 +709,11 @@ func getDatabaseIDAndDesc(
return dbID, nil, errors.Errorf("a database named %q needs to exist", targetDB)
}
// Check privileges on the parent DB.
dbDesc, err = col.Direct().MustGetDatabaseDescByID(ctx, txn, dbID)
_, dbDesc, err = col.GetImmutableDatabaseByID(ctx, txn, dbID, tree.DatabaseLookupFlags{
AvoidLeased: true,
IncludeDropped: true,
IncludeOffline: true,
})
if err != nil {
return 0, nil, errors.Wrapf(err,
"failed to lookup parent DB %d", errors.Safe(dbID))
Expand Down
8 changes: 7 additions & 1 deletion pkg/ccl/backupccl/targets.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,13 @@ func checkMultiRegionCompatible(
// For REGION BY TABLE IN <region> tables, allow the restore if the
// database has the region.
regionEnumID := database.GetRegionConfig().RegionEnumID
regionEnum, err := col.Direct().MustGetTypeDescByID(ctx, txn, regionEnumID)
regionEnum, err := col.GetImmutableTypeByID(ctx, txn, regionEnumID, tree.ObjectLookupFlags{
CommonLookupFlags: tree.CommonLookupFlags{
AvoidLeased: true,
IncludeDropped: true,
IncludeOffline: true,
},
})
if err != nil {
return err
}
Expand Down
15 changes: 9 additions & 6 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/resolver"
"github.com/cockroachdb/cockroach/pkg/sql/exprutil"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
Expand Down Expand Up @@ -1180,18 +1179,22 @@ func getQualifiedTableNameObj(
ctx context.Context, execCfg *sql.ExecutorConfig, txn *kv.Txn, desc catalog.TableDescriptor,
) (tree.TableName, error) {
col := execCfg.CollectionFactory.NewCollection(ctx)
dbDesc, err := col.Direct().MustGetDatabaseDescByID(ctx, txn, desc.GetParentID())
flags := tree.CommonLookupFlags{
AvoidLeased: true,
IncludeDropped: true,
IncludeOffline: true,
}
_, db, err := col.GetImmutableDatabaseByID(ctx, txn, desc.GetParentID(), flags)
if err != nil {
return tree.TableName{}, err
}
schemaID := desc.GetParentSchemaID()
schemaName, err := resolver.ResolveSchemaNameByID(ctx, txn, execCfg.Codec, dbDesc, schemaID)
sc, err := col.GetImmutableSchemaByID(ctx, txn, desc.GetParentSchemaID(), flags)
if err != nil {
return tree.TableName{}, err
}
tbName := tree.MakeTableNameWithSchema(
tree.Name(dbDesc.GetName()),
tree.Name(schemaName),
tree.Name(db.GetName()),
tree.Name(sc.GetName()),
tree.Name(desc.GetName()),
)
return tbName, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduledjobs/schedulebase/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func FullyQualifyTables(
if err != nil {
return err
}
schemaID, err = col.Direct().ResolveSchemaID(ctx, txn, dbDesc.GetID(), tp.SchemaName.String())
schemaID, err = col.LookupSchemaID(ctx, txn, dbDesc.GetID(), tp.SchemaName.String())
return err
}); err != nil {
return nil, err
Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/alter_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc"
"github.com/cockroachdb/cockroach/pkg/sql/decodeusername"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
Expand Down Expand Up @@ -217,8 +218,9 @@ func (p *planner) dropEnumValue(
}

func (p *planner) renameType(ctx context.Context, n *alterTypeNode, newName string) error {
err := p.Descriptors().Direct().CheckObjectCollision(
err := descs.CheckObjectNameCollision(
ctx,
p.Descriptors(),
p.txn,
n.desc.ParentID,
n.desc.ParentSchemaID,
Expand Down
16 changes: 14 additions & 2 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -1379,7 +1379,13 @@ func (sc *SchemaChanger) updateJobRunningStatus(
) (tableDesc catalog.TableDescriptor, err error) {
err = DescsTxn(ctx, sc.execCfg, func(ctx context.Context, txn *kv.Txn, col *descs.Collection) (err error) {
// Read table descriptor without holding a lease.
tableDesc, err = col.Direct().MustGetTableDescByID(ctx, txn, sc.descID)
tableDesc, err = col.GetImmutableTableByID(ctx, txn, sc.descID, tree.ObjectLookupFlags{
CommonLookupFlags: tree.CommonLookupFlags{
AvoidLeased: true,
IncludeDropped: true,
IncludeOffline: true,
},
})
if err != nil {
return err
}
Expand Down Expand Up @@ -2635,7 +2641,13 @@ func getTargetTablesAndFk(
if fk == nil {
return nil, nil, nil, errors.AssertionFailedf("foreign key %s does not exist", fkName)
}
targetTable, err = descsCol.Direct().MustGetTableDescByID(ctx, txn, fk.ReferencedTableID)
targetTable, err = descsCol.GetImmutableTableByID(ctx, txn, fk.ReferencedTableID, tree.ObjectLookupFlags{
CommonLookupFlags: tree.CommonLookupFlags{
AvoidLeased: true,
IncludeDropped: true,
IncludeOffline: true,
},
})
if err != nil {
return nil, nil, nil, err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/catalog/descs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
"errors.go",
"factory.go",
"function.go",
"helpers.go",
"hydrate.go",
"leased_descriptors.go",
"object.go",
Expand Down
18 changes: 10 additions & 8 deletions pkg/sql/catalog/descs/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,16 @@ func (tc *Collection) GetAll(ctx context.Context, txn *kv.Txn) (nstree.Catalog,
return ret.Catalog, nil
}

// GetAllFromStorageUnvalidated delegates to an uncached catkv.CatalogReader's
// ScanAll method. Nothing is cached, validated or hydrated. This is to be used
// sparingly and only in situations which warrant it, where an unmediated view
// of the stored catalog is explicitly desired for observability.
func (tc *Collection) GetAllFromStorageUnvalidated(
ctx context.Context, txn *kv.Txn,
) (nstree.Catalog, error) {
return catkv.NewUncachedCatalogReader(tc.codec()).ScanAll(ctx, txn)
}

// GetAllDatabases is like GetAll but filtered to non-dropped databases.
func (tc *Collection) GetAllDatabases(ctx context.Context, txn *kv.Txn) (nstree.Catalog, error) {
stored, err := tc.cr.ScanNamespaceForDatabases(ctx, txn)
Expand Down Expand Up @@ -1196,14 +1206,6 @@ func (tc *Collection) GetConstraintComment(
return tc.GetComment(catalogkeys.MakeCommentKey(uint32(tableID), uint32(constraintID), catalogkeys.ConstraintCommentType))
}

// Direct exports the catkv.Direct interface.
type Direct = catkv.Direct

// Direct provides direct access to the underlying KV-storage.
func (tc *Collection) Direct() Direct {
return catkv.MakeDirect(tc.codec(), tc.version, tc.validationModeProvider)
}

// MakeTestCollection makes a Collection that can be used for tests.
func MakeTestCollection(ctx context.Context, leaseManager *lease.Manager) Collection {
settings := cluster.MakeTestingClusterSettings()
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/catalog/descs/collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1045,7 +1045,7 @@ func TestHydrateCatalog(t *testing.T) {
require.NoError(t, sql.DescsTxn(ctx, &execCfg, func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
cat, err := descriptors.Direct().GetCatalogUnvalidated(ctx, txn)
cat, err := descriptors.GetAllFromStorageUnvalidated(ctx, txn)
if err != nil {
return err
}
Expand All @@ -1061,7 +1061,7 @@ func TestHydrateCatalog(t *testing.T) {
require.NoError(t, sql.DescsTxn(ctx, &execCfg, func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
cat, err := descriptors.Direct().GetCatalogUnvalidated(ctx, txn)
cat, err := descriptors.GetAllFromStorageUnvalidated(ctx, txn)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/catalog/descs/descriptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,11 @@ func (tc *Collection) MarkUncommittedCommentDeletedForTable(tblID descpb.ID) {
//
// The Required flag is ignored and always overridden.
func (tc *Collection) getDescriptorByID(
ctx context.Context, txn *kv.Txn, flags tree.CommonLookupFlags, ids ...descpb.ID,
ctx context.Context, txn *kv.Txn, flags tree.CommonLookupFlags, id descpb.ID,
) (catalog.Descriptor, error) {
var arr [1]catalog.Descriptor
if err := getDescriptorsByID(
ctx, tc, txn, flags, arr[:], ids...,
ctx, tc, txn, flags, arr[:], id,
); err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/catalog/descs/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func NewCollectionFactory(
leaseMgr: leaseMgr,
virtualSchemas: virtualSchemas,
hydrated: hydrated,
systemDatabase: catkv.NewSystemDatabaseCache(leaseMgr.Codec(), settings),
systemDatabase: leaseMgr.SystemDatabaseCache(),
spanConfigSplitter: spanConfigSplitter,
spanConfigLimiter: spanConfigLimiter,
defaultMonitor: mon.NewUnlimitedMonitor(ctx, "CollectionFactoryDefaultUnlimitedMonitor",
Expand Down
66 changes: 66 additions & 0 deletions pkg/sql/catalog/descs/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package descs

import (
"context"

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/errors"
)

// GetDescriptorCollidingWithObjectName returns the descriptor which collides
// with the desired name if it exists.
func GetDescriptorCollidingWithObjectName(
ctx context.Context, tc *Collection, txn *kv.Txn, parentID, parentSchemaID descpb.ID, name string,
) (catalog.Descriptor, error) {
id, err := tc.LookupObjectID(ctx, txn, parentID, parentSchemaID, name)
if err != nil || id == descpb.InvalidID {
return nil, err
}
// At this point the ID is already in use by another object.
flags := tree.CommonLookupFlags{
AvoidLeased: true,
IncludeOffline: true,
IncludeDropped: true,
}
desc, err := tc.getDescriptorByID(ctx, txn, flags, id)
if errors.Is(err, catalog.ErrDescriptorNotFound) {
// Since the ID exists the descriptor should absolutely exist.
err = errors.NewAssertionErrorWithWrappedErrf(err,
"parentID=%d parentSchemaID=%d name=%q has ID=%d",
parentID, parentSchemaID, name, id)
}
return desc, err
}

// CheckObjectNameCollision returns an error if the object name is already used.
func CheckObjectNameCollision(
ctx context.Context,
tc *Collection,
txn *kv.Txn,
parentID, parentSchemaID descpb.ID,
name tree.ObjectName,
) error {
d, err := GetDescriptorCollidingWithObjectName(ctx, tc, txn, parentID, parentSchemaID, name.Object())
if err != nil || d == nil {
return err
}
maybeQualifiedName := name.Object()
if name.Catalog() != "" && name.Schema() != "" {
maybeQualifiedName = name.FQString()
}
return sqlerrors.MakeObjectAlreadyExistsError(d.DescriptorProto(), maybeQualifiedName)
}
Loading

0 comments on commit 53b1dea

Please sign in to comment.