Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

catkv,*: remove catkv.Direct interface #93611

Merged
merged 9 commits into from
Dec 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 25 additions & 8 deletions pkg/ccl/backupccl/restore_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func allocateDescriptorRewrites(
}

// See if there is an existing schema with the same name.
id, err := col.Direct().LookupSchemaID(ctx, txn, parentID, sc.Name)
id, err := col.LookupSchemaID(ctx, txn, parentID, sc.Name)
if err != nil {
return err
}
Expand All @@ -335,7 +335,11 @@ func allocateDescriptorRewrites(
} else {
// If we found an existing schema, then we need to remap all references
// to this schema to the existing one.
desc, err := col.Direct().MustGetSchemaDescByID(ctx, txn, id)
desc, err := col.GetImmutableSchemaByID(ctx, txn, id, tree.SchemaLookupFlags{
AvoidLeased: true,
IncludeDropped: true,
IncludeOffline: true,
})
if err != nil {
return err
}
Expand Down Expand Up @@ -378,13 +382,17 @@ func allocateDescriptorRewrites(
// Check that the table name is _not_ in use.
// This would fail the CPut later anyway, but this yields a prettier error.
tableName := tree.NewUnqualifiedTableName(tree.Name(table.GetName()))
err := col.Direct().CheckObjectCollision(ctx, txn, parentID, table.GetParentSchemaID(), tableName)
err := descs.CheckObjectNameCollision(ctx, col, txn, parentID, table.GetParentSchemaID(), tableName)
if err != nil {
return err
}

// Check privileges.
parentDB, err := col.Direct().MustGetDatabaseDescByID(ctx, txn, parentID)
_, parentDB, err := col.GetImmutableDatabaseByID(ctx, txn, parentID, tree.DatabaseLookupFlags{
AvoidLeased: true,
IncludeDropped: true,
IncludeOffline: true,
})
if err != nil {
return errors.Wrapf(err,
"failed to lookup parent DB %d", errors.Safe(parentID))
Expand Down Expand Up @@ -449,7 +457,11 @@ func allocateDescriptorRewrites(
targetDB, typ.Name)
}
// Check privileges on the parent DB.
parentDB, err := col.Direct().MustGetDatabaseDescByID(ctx, txn, parentID)
_, parentDB, err := col.GetImmutableDatabaseByID(ctx, txn, parentID, tree.DatabaseLookupFlags{
AvoidLeased: true,
IncludeDropped: true,
IncludeOffline: true,
})
if err != nil {
return errors.Wrapf(err,
"failed to lookup parent DB %d", errors.Safe(parentID))
Expand All @@ -464,8 +476,9 @@ func allocateDescriptorRewrites(
}
return
}
desc, err := col.Direct().GetDescriptorCollidingWithObject(
desc, err := descs.GetDescriptorCollidingWithObjectName(
ctx,
col,
txn,
parentID,
getParentSchemaID(typ),
Expand All @@ -492,7 +505,7 @@ func allocateDescriptorRewrites(
// Ensure that there isn't a collision with the array type name.
arrTyp := typesByID[typ.ArrayTypeID]
typeName := tree.NewUnqualifiedTypeName(arrTyp.GetName())
err = col.Direct().CheckObjectCollision(ctx, txn, parentID, getParentSchemaID(typ), typeName)
err = descs.CheckObjectNameCollision(ctx, col, txn, parentID, getParentSchemaID(typ), typeName)
if err != nil {
return errors.Wrapf(err, "name collision for %q's array type", typ.Name)
}
Expand Down Expand Up @@ -696,7 +709,11 @@ func getDatabaseIDAndDesc(
return dbID, nil, errors.Errorf("a database named %q needs to exist", targetDB)
}
// Check privileges on the parent DB.
dbDesc, err = col.Direct().MustGetDatabaseDescByID(ctx, txn, dbID)
_, dbDesc, err = col.GetImmutableDatabaseByID(ctx, txn, dbID, tree.DatabaseLookupFlags{
AvoidLeased: true,
IncludeDropped: true,
IncludeOffline: true,
})
if err != nil {
return 0, nil, errors.Wrapf(err,
"failed to lookup parent DB %d", errors.Safe(dbID))
Expand Down
8 changes: 7 additions & 1 deletion pkg/ccl/backupccl/targets.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,13 @@ func checkMultiRegionCompatible(
// For REGION BY TABLE IN <region> tables, allow the restore if the
// database has the region.
regionEnumID := database.GetRegionConfig().RegionEnumID
regionEnum, err := col.Direct().MustGetTypeDescByID(ctx, txn, regionEnumID)
regionEnum, err := col.GetImmutableTypeByID(ctx, txn, regionEnumID, tree.ObjectLookupFlags{
CommonLookupFlags: tree.CommonLookupFlags{
AvoidLeased: true,
IncludeDropped: true,
IncludeOffline: true,
},
})
if err != nil {
return err
}
Expand Down
15 changes: 9 additions & 6 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/resolver"
"github.com/cockroachdb/cockroach/pkg/sql/exprutil"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
Expand Down Expand Up @@ -1180,18 +1179,22 @@ func getQualifiedTableNameObj(
ctx context.Context, execCfg *sql.ExecutorConfig, txn *kv.Txn, desc catalog.TableDescriptor,
) (tree.TableName, error) {
col := execCfg.CollectionFactory.NewCollection(ctx)
dbDesc, err := col.Direct().MustGetDatabaseDescByID(ctx, txn, desc.GetParentID())
flags := tree.CommonLookupFlags{
AvoidLeased: true,
IncludeDropped: true,
IncludeOffline: true,
}
_, db, err := col.GetImmutableDatabaseByID(ctx, txn, desc.GetParentID(), flags)
if err != nil {
return tree.TableName{}, err
}
schemaID := desc.GetParentSchemaID()
schemaName, err := resolver.ResolveSchemaNameByID(ctx, txn, execCfg.Codec, dbDesc, schemaID)
sc, err := col.GetImmutableSchemaByID(ctx, txn, desc.GetParentSchemaID(), flags)
if err != nil {
return tree.TableName{}, err
}
tbName := tree.MakeTableNameWithSchema(
tree.Name(dbDesc.GetName()),
tree.Name(schemaName),
tree.Name(db.GetName()),
tree.Name(sc.GetName()),
tree.Name(desc.GetName()),
)
return tbName, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduledjobs/schedulebase/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func FullyQualifyTables(
if err != nil {
return err
}
schemaID, err = col.Direct().ResolveSchemaID(ctx, txn, dbDesc.GetID(), tp.SchemaName.String())
schemaID, err = col.LookupSchemaID(ctx, txn, dbDesc.GetID(), tp.SchemaName.String())
return err
}); err != nil {
return nil, err
Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/alter_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc"
"github.com/cockroachdb/cockroach/pkg/sql/decodeusername"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
Expand Down Expand Up @@ -217,8 +218,9 @@ func (p *planner) dropEnumValue(
}

func (p *planner) renameType(ctx context.Context, n *alterTypeNode, newName string) error {
err := p.Descriptors().Direct().CheckObjectCollision(
err := descs.CheckObjectNameCollision(
ctx,
p.Descriptors(),
p.txn,
n.desc.ParentID,
n.desc.ParentSchemaID,
Expand Down
16 changes: 14 additions & 2 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -1379,7 +1379,13 @@ func (sc *SchemaChanger) updateJobRunningStatus(
) (tableDesc catalog.TableDescriptor, err error) {
err = DescsTxn(ctx, sc.execCfg, func(ctx context.Context, txn *kv.Txn, col *descs.Collection) (err error) {
// Read table descriptor without holding a lease.
tableDesc, err = col.Direct().MustGetTableDescByID(ctx, txn, sc.descID)
tableDesc, err = col.GetImmutableTableByID(ctx, txn, sc.descID, tree.ObjectLookupFlags{
CommonLookupFlags: tree.CommonLookupFlags{
AvoidLeased: true,
IncludeDropped: true,
IncludeOffline: true,
},
})
if err != nil {
return err
}
Expand Down Expand Up @@ -2635,7 +2641,13 @@ func getTargetTablesAndFk(
if fk == nil {
return nil, nil, nil, errors.AssertionFailedf("foreign key %s does not exist", fkName)
}
targetTable, err = descsCol.Direct().MustGetTableDescByID(ctx, txn, fk.ReferencedTableID)
targetTable, err = descsCol.GetImmutableTableByID(ctx, txn, fk.ReferencedTableID, tree.ObjectLookupFlags{
CommonLookupFlags: tree.CommonLookupFlags{
AvoidLeased: true,
IncludeDropped: true,
IncludeOffline: true,
},
})
if err != nil {
return nil, nil, nil, err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/catalog/descs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
"errors.go",
"factory.go",
"function.go",
"helpers.go",
"hydrate.go",
"leased_descriptors.go",
"object.go",
Expand Down
18 changes: 10 additions & 8 deletions pkg/sql/catalog/descs/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,16 @@ func (tc *Collection) GetAll(ctx context.Context, txn *kv.Txn) (nstree.Catalog,
return ret.Catalog, nil
}

// GetAllFromStorageUnvalidated delegates to an uncached catkv.CatalogReader's
// ScanAll method. Nothing is cached, validated or hydrated. This is to be used
// sparingly and only in situations which warrant it, where an unmediated view
// of the stored catalog is explicitly desired for observability.
func (tc *Collection) GetAllFromStorageUnvalidated(
ctx context.Context, txn *kv.Txn,
) (nstree.Catalog, error) {
return catkv.NewUncachedCatalogReader(tc.codec()).ScanAll(ctx, txn)
}

// GetAllDatabases is like GetAll but filtered to non-dropped databases.
func (tc *Collection) GetAllDatabases(ctx context.Context, txn *kv.Txn) (nstree.Catalog, error) {
stored, err := tc.cr.ScanNamespaceForDatabases(ctx, txn)
Expand Down Expand Up @@ -1196,14 +1206,6 @@ func (tc *Collection) GetConstraintComment(
return tc.GetComment(catalogkeys.MakeCommentKey(uint32(tableID), uint32(constraintID), catalogkeys.ConstraintCommentType))
}

// Direct exports the catkv.Direct interface.
type Direct = catkv.Direct

// Direct provides direct access to the underlying KV-storage.
func (tc *Collection) Direct() Direct {
return catkv.MakeDirect(tc.codec(), tc.version, tc.validationModeProvider)
}

// MakeTestCollection makes a Collection that can be used for tests.
func MakeTestCollection(ctx context.Context, leaseManager *lease.Manager) Collection {
settings := cluster.MakeTestingClusterSettings()
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/catalog/descs/collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1045,7 +1045,7 @@ func TestHydrateCatalog(t *testing.T) {
require.NoError(t, sql.DescsTxn(ctx, &execCfg, func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
cat, err := descriptors.Direct().GetCatalogUnvalidated(ctx, txn)
cat, err := descriptors.GetAllFromStorageUnvalidated(ctx, txn)
if err != nil {
return err
}
Expand All @@ -1061,7 +1061,7 @@ func TestHydrateCatalog(t *testing.T) {
require.NoError(t, sql.DescsTxn(ctx, &execCfg, func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
cat, err := descriptors.Direct().GetCatalogUnvalidated(ctx, txn)
cat, err := descriptors.GetAllFromStorageUnvalidated(ctx, txn)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/catalog/descs/descriptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,11 @@ func (tc *Collection) MarkUncommittedCommentDeletedForTable(tblID descpb.ID) {
//
// The Required flag is ignored and always overridden.
func (tc *Collection) getDescriptorByID(
ctx context.Context, txn *kv.Txn, flags tree.CommonLookupFlags, ids ...descpb.ID,
ctx context.Context, txn *kv.Txn, flags tree.CommonLookupFlags, id descpb.ID,
) (catalog.Descriptor, error) {
var arr [1]catalog.Descriptor
if err := getDescriptorsByID(
ctx, tc, txn, flags, arr[:], ids...,
ctx, tc, txn, flags, arr[:], id,
); err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/catalog/descs/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func NewCollectionFactory(
leaseMgr: leaseMgr,
virtualSchemas: virtualSchemas,
hydrated: hydrated,
systemDatabase: catkv.NewSystemDatabaseCache(leaseMgr.Codec(), settings),
systemDatabase: leaseMgr.SystemDatabaseCache(),
spanConfigSplitter: spanConfigSplitter,
spanConfigLimiter: spanConfigLimiter,
defaultMonitor: mon.NewUnlimitedMonitor(ctx, "CollectionFactoryDefaultUnlimitedMonitor",
Expand Down
66 changes: 66 additions & 0 deletions pkg/sql/catalog/descs/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package descs

import (
"context"

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/errors"
)

// GetDescriptorCollidingWithObjectName returns the descriptor which collides
// with the desired name if it exists.
func GetDescriptorCollidingWithObjectName(
ctx context.Context, tc *Collection, txn *kv.Txn, parentID, parentSchemaID descpb.ID, name string,
) (catalog.Descriptor, error) {
id, err := tc.LookupObjectID(ctx, txn, parentID, parentSchemaID, name)
if err != nil || id == descpb.InvalidID {
return nil, err
}
// At this point the ID is already in use by another object.
flags := tree.CommonLookupFlags{
AvoidLeased: true,
IncludeOffline: true,
IncludeDropped: true,
}
desc, err := tc.getDescriptorByID(ctx, txn, flags, id)
if errors.Is(err, catalog.ErrDescriptorNotFound) {
// Since the ID exists the descriptor should absolutely exist.
err = errors.NewAssertionErrorWithWrappedErrf(err,
"parentID=%d parentSchemaID=%d name=%q has ID=%d",
parentID, parentSchemaID, name, id)
}
return desc, err
}

// CheckObjectNameCollision returns an error if the object name is already used.
func CheckObjectNameCollision(
ctx context.Context,
tc *Collection,
txn *kv.Txn,
parentID, parentSchemaID descpb.ID,
name tree.ObjectName,
) error {
d, err := GetDescriptorCollidingWithObjectName(ctx, tc, txn, parentID, parentSchemaID, name.Object())
if err != nil || d == nil {
return err
}
maybeQualifiedName := name.Object()
if name.Catalog() != "" && name.Schema() != "" {
maybeQualifiedName = name.FQString()
}
return sqlerrors.MakeObjectAlreadyExistsError(d.DescriptorProto(), maybeQualifiedName)
}
Loading