Skip to content

Commit

Permalink
sql,resolver: remove GetForDatabase
Browse files Browse the repository at this point in the history
This removes the last usage of uncached catkv.CatalogReader outside of
testing.

Release note: None
  • Loading branch information
Marius Posta committed Dec 15, 2022
1 parent 5cc0140 commit 6a1a2d1
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 107 deletions.
13 changes: 6 additions & 7 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/resolver"
"github.com/cockroachdb/cockroach/pkg/sql/exprutil"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
Expand Down Expand Up @@ -1180,22 +1179,22 @@ func getQualifiedTableNameObj(
ctx context.Context, execCfg *sql.ExecutorConfig, txn *kv.Txn, desc catalog.TableDescriptor,
) (tree.TableName, error) {
col := execCfg.CollectionFactory.NewCollection(ctx)
_, dbDesc, err := col.GetImmutableDatabaseByID(ctx, txn, desc.GetParentID(), tree.DatabaseLookupFlags{
flags := tree.CommonLookupFlags{
AvoidLeased: true,
IncludeDropped: true,
IncludeOffline: true,
})
}
_, db, err := col.GetImmutableDatabaseByID(ctx, txn, desc.GetParentID(), flags)
if err != nil {
return tree.TableName{}, err
}
schemaID := desc.GetParentSchemaID()
schemaName, err := resolver.ResolveSchemaNameByID(ctx, txn, execCfg.Codec, dbDesc, schemaID)
sc, err := col.GetImmutableSchemaByID(ctx, txn, desc.GetParentSchemaID(), flags)
if err != nil {
return tree.TableName{}, err
}
tbName := tree.MakeTableNameWithSchema(
tree.Name(dbDesc.GetName()),
tree.Name(schemaName),
tree.Name(db.GetName()),
tree.Name(sc.GetName()),
tree.Name(desc.GetName()),
)
return tbName, nil
Expand Down
7 changes: 4 additions & 3 deletions pkg/sql/catalog/internal/catkv/catalog_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,10 @@ type CatalogReader interface {
) (nstree.Catalog, error)
}

// NewUncachedCatalogReader is the constructor for the default CatalogReader
// implementation without a SystemDatabaseCache.
func NewUncachedCatalogReader(codec keys.SQLCodec) CatalogReader {
// NewTestingUncachedCatalogReader is the constructor for the default
// CatalogReader implementation without a SystemDatabaseCache.
// This should not be used outside of testing purposes.
func NewTestingUncachedCatalogReader(codec keys.SQLCodec) CatalogReader {
return &catalogReader{
codec: codec,
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/catalog/internal/catkv/catalog_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestDataDriven(t *testing.T) {
v := execCfg.Settings.Version.ActiveVersion(ctx)
sdc := catkv.NewSystemDatabaseCache(execCfg.Codec, execCfg.Settings)
ccr := catkv.NewCatalogReader(execCfg.Codec, v, sdc, nil /* maybeMonitor */)
ucr := catkv.NewUncachedCatalogReader(execCfg.Codec)
ucr := catkv.NewTestingUncachedCatalogReader(execCfg.Codec)

datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) (ret string) {
h := testHelper{
Expand Down
5 changes: 0 additions & 5 deletions pkg/sql/catalog/resolver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,8 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/sql/catalog/resolver",
visibility = ["//visibility:public"],
deps = [
"//pkg/keys",
"//pkg/kv",
"//pkg/sql/catalog",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/internal/catkv",
"//pkg/sql/catalog/nstree",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/catalog/typedesc",
"//pkg/sql/pgwire/pgcode",
Expand All @@ -22,7 +18,6 @@ go_library(
"//pkg/sql/sessiondata",
"//pkg/sql/sqlerrors",
"//pkg/util/hlc",
"//pkg/util/log",
"@com_github_cockroachdb_errors//:errors",
],
)
Expand Down
62 changes: 0 additions & 62 deletions pkg/sql/catalog/resolver/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,8 @@ package resolver
import (
"context"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/internal/catkv"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/nstree"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
Expand All @@ -28,7 +24,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -283,70 +278,13 @@ func ResolveTargetObject(
return scInfo, prefix, nil
}

// ResolveSchemaNameByID resolves a schema's name based on db and schema id.
// Instead, we have to rely on a scan of the kv table.
// TODO (SQLSchema): The remaining uses of this should be plumbed through
//
// the desc.Collection's ResolveSchemaByID.
func ResolveSchemaNameByID(
ctx context.Context,
txn *kv.Txn,
codec keys.SQLCodec,
db catalog.DatabaseDescriptor,
schemaID descpb.ID,
) (string, error) {
// Fast-path for public schema and virtual schemas, to avoid hot lookups.
staticSchemaMap := catconstants.GetStaticSchemaIDMap()
if schemaName, ok := staticSchemaMap[uint32(schemaID)]; ok {
return schemaName, nil
}
schemas, err := GetForDatabase(ctx, txn, codec, db)
if err != nil {
return "", err
}
if schema, ok := schemas[schemaID]; ok {
return schema.Name, nil
}
return "", errors.Newf("unable to resolve schema id %d for db %d", schemaID, db.GetID())
}

// SchemaEntryForDB entry for an individual schema,
// which includes the name and modification timestamp.
type SchemaEntryForDB struct {
Name string
Timestamp hlc.Timestamp
}

// GetForDatabase looks up and returns all available
// schema ids to SchemaEntryForDB structures for a
// given database.
func GetForDatabase(
ctx context.Context, txn *kv.Txn, codec keys.SQLCodec, db catalog.DatabaseDescriptor,
) (map[descpb.ID]SchemaEntryForDB, error) {
log.Eventf(ctx, "fetching all schema descriptor IDs for database %q (%d)", db.GetName(), db.GetID())
cr := catkv.NewUncachedCatalogReader(codec)
c, err := cr.ScanNamespaceForDatabaseSchemas(ctx, txn, db)
if err != nil {
return nil, err
}
ret := make(map[descpb.ID]SchemaEntryForDB)
// This is needed at least for the temp system db during restores.
if !db.HasPublicSchemaWithDescriptor() {
ret[keys.PublicSchemaIDForBackup] = SchemaEntryForDB{
Name: catconstants.PublicSchemaName,
Timestamp: txn.ReadTimestamp(),
}
}
_ = c.ForEachNamespaceEntry(func(e nstree.NamespaceEntry) error {
ret[e.GetID()] = SchemaEntryForDB{
Name: e.GetName(),
Timestamp: e.GetMVCCTimestamp(),
}
return nil
})
return ret, nil
}

// ResolveExisting performs name resolution for an object name when
// the target object is expected to exist already. It does not
// mutate the input name. It additionally returns the resolved
Expand Down
57 changes: 28 additions & 29 deletions pkg/sql/temporary_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/resolver"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/nstree"
"github.com/cockroachdb/cockroach/pkg/sql/clusterunique"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
Expand Down Expand Up @@ -317,13 +317,7 @@ func cleanupTempSchemaObjects(
if err != nil {
return err
}
schema, err := resolver.ResolveSchemaNameByID(
ctx,
txn,
codec,
db,
dTableDesc.GetParentSchemaID(),
)
sc, err := descsCol.GetImmutableSchemaByID(ctx, txn, dTableDesc.GetParentSchemaID(), flags)
if err != nil {
return err
}
Expand All @@ -335,7 +329,7 @@ func cleanupTempSchemaObjects(
if dependentColIDs.Contains(int(col.GetID())) {
tbName := tree.MakeTableNameWithSchema(
tree.Name(db.GetName()),
tree.Name(schema),
tree.Name(sc.GetName()),
tree.Name(dTableDesc.GetName()),
)
_, err = ie.ExecEx(
Expand Down Expand Up @@ -532,43 +526,48 @@ func (c *TemporaryObjectCleaner) doTemporaryObjectCleanup(
// Only see temporary schemas after some delay as safety
// mechanism.
waitTimeForCreation := TempObjectWaitInterval.Get(&c.settings.SV)
// Build a set of all databases with temporary objects.
var allDbDescs []catalog.DatabaseDescriptor
descsCol := c.collectionFactory.NewCollection(ctx)
if err := retryFunc(ctx, func() error {
var err error
allDbDescs, err = descsCol.GetAllDatabaseDescriptors(ctx, txn)
// Build a set of all databases with temporary objects.
var dbs nstree.Catalog
if err := retryFunc(ctx, func() (err error) {
dbs, err = descsCol.GetAllDatabases(ctx, txn)
return err
}); err != nil {
return err
}

sessionIDs := make(map[clusterunique.ID]struct{})
for _, dbDesc := range allDbDescs {
var schemaEntries map[descpb.ID]resolver.SchemaEntryForDB
if err := retryFunc(ctx, func() error {
var err error
schemaEntries, err = resolver.GetForDatabase(ctx, txn, c.codec, dbDesc)
if err := dbs.ForEachDescriptor(func(dbDesc catalog.Descriptor) error {
db, err := catalog.AsDatabaseDescriptor(dbDesc)
if err != nil {
return err
}
var schemas nstree.Catalog
if err := retryFunc(ctx, func() (err error) {
schemas, err = descsCol.GetAllSchemasInDatabase(ctx, txn, db)
return err
}); err != nil {
return err
}
for _, scEntry := range schemaEntries {
return schemas.ForEachNamespaceEntry(func(e nstree.NamespaceEntry) error {
if e.GetParentSchemaID() != descpb.InvalidID {
return nil
}
// Skip over any temporary objects that are not old enough,
// we intentionally use a delay to avoid problems.
if !scEntry.Timestamp.Less(txn.ReadTimestamp().Add(-waitTimeForCreation.Nanoseconds(), 0)) {
continue
if !e.GetMVCCTimestamp().Less(txn.ReadTimestamp().Add(-waitTimeForCreation.Nanoseconds(), 0)) {
return nil
}
isTempSchema, sessionID, err := temporarySchemaSessionID(scEntry.Name)
if err != nil {
if isTempSchema, sessionID, err := temporarySchemaSessionID(e.GetName()); err != nil {
// This should not cause an error.
log.Warningf(ctx, "could not parse %q as temporary schema name", scEntry)
continue
}
if isTempSchema {
log.Warningf(ctx, "could not parse %q as temporary schema name", e.GetName())
} else if isTempSchema {
sessionIDs[sessionID] = struct{}{}
}
}
return nil
})
}); err != nil {
return err
}
log.Infof(ctx, "found %d temporary schemas", len(sessionIDs))

Expand Down

0 comments on commit 6a1a2d1

Please sign in to comment.