diff --git a/pkg/bench/rttanalysis/testdata/benchmark_expectations b/pkg/bench/rttanalysis/testdata/benchmark_expectations index 4f75451bfc89..56eb9f44e80e 100644 --- a/pkg/bench/rttanalysis/testdata/benchmark_expectations +++ b/pkg/bench/rttanalysis/testdata/benchmark_expectations @@ -35,9 +35,9 @@ exp,benchmark 17,DropDatabase/drop_database_1_table 18,DropDatabase/drop_database_2_tables 19,DropDatabase/drop_database_3_tables -20,DropRole/drop_1_role -27,DropRole/drop_2_roles -34,DropRole/drop_3_roles +18,DropRole/drop_1_role +25,DropRole/drop_2_roles +32,DropRole/drop_3_roles 17,DropSequence/drop_1_sequence 19,DropSequence/drop_2_sequences 21,DropSequence/drop_3_sequences @@ -92,5 +92,5 @@ exp,benchmark 18,Truncate/truncate_2_column_2_rows 1,VirtualTableQueries/select_crdb_internal.invalid_objects_with_1_fk 1,VirtualTableQueries/select_crdb_internal.tables_with_1_fk -6,VirtualTableQueries/virtual_table_cache_with_point_lookups -21,VirtualTableQueries/virtual_table_cache_with_schema_change +4,VirtualTableQueries/virtual_table_cache_with_point_lookups +12,VirtualTableQueries/virtual_table_cache_with_schema_change diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 8f5665e35c99..a3b7cd7c8204 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -2188,7 +2188,7 @@ func (r *restoreResumer) dropDescriptors( // descriptor validation. mutSchema.SetDropped() mutSchema.MaybeIncrementVersion() - if err := descsCol.AddUncommittedDescriptor(mutSchema); err != nil { + if err := descsCol.AddUncommittedDescriptor(ctx, mutSchema); err != nil { return err } @@ -2258,7 +2258,7 @@ func (r *restoreResumer) dropDescriptors( // descriptor validation. db.SetDropped() db.MaybeIncrementVersion() - if err := descsCol.AddUncommittedDescriptor(db); err != nil { + if err := descsCol.AddUncommittedDescriptor(ctx, db); err != nil { return err } diff --git a/pkg/sql/catalog/descs/BUILD.bazel b/pkg/sql/catalog/descs/BUILD.bazel index 8fa35ccadcbb..85c55e7fc211 100644 --- a/pkg/sql/catalog/descs/BUILD.bazel +++ b/pkg/sql/catalog/descs/BUILD.bazel @@ -11,10 +11,10 @@ go_library( "dist_sql_type_resolver.go", "factory.go", "hydrate.go", - "kv_descriptors.go", "leased_descriptors.go", "object.go", "schema.go", + "stored_descriptors.go", "synthetic_descriptors.go", "system_database_namespace_cache.go", "system_table.go", @@ -23,7 +23,6 @@ go_library( "temporary_descriptors.go", "txn.go", "type.go", - "uncommitted_descriptors.go", "validate.go", "virtual_descriptors.go", ], diff --git a/pkg/sql/catalog/descs/collection.go b/pkg/sql/catalog/descs/collection.go index 5c205589c7b8..748b6112bd7c 100644 --- a/pkg/sql/catalog/descs/collection.go +++ b/pkg/sql/catalog/descs/collection.go @@ -57,7 +57,7 @@ func makeCollection( hydratedTables: hydratedTables, virtual: makeVirtualDescriptors(virtualSchemas), leased: makeLeasedDescriptors(leaseMgr), - kv: makeKVDescriptors(codec, systemNamespace, monitor), + stored: makeStoredDescriptors(codec, systemNamespace, monitor), temporary: makeTemporaryDescriptors(settings, codec, temporarySchemaProvider), direct: makeDirect(ctx, codec, settings), } @@ -83,17 +83,17 @@ type Collection struct { // the transaction using them is complete. leased leasedDescriptors - // Descriptors modified by the uncommitted transaction affiliated with this - // Collection. This allows a transaction to see its own modifications while - // bypassing the descriptor lease mechanism. The lease mechanism will have its - // own transaction to read the descriptor and will hang waiting for the - // uncommitted changes to the descriptor if this transaction is PRIORITY HIGH. - // These descriptors are local to this Collection and their state is thus not - // visible to other transactions. - uncommitted uncommittedDescriptors - - // A collection of descriptors which were read from the store. - kv kvDescriptors + // A mirror of the descriptors in storage. These descriptors are either (1) + // already stored and were read from KV, or (2) have been modified by the + // uncommitted transaction affiliated with this Collection and should be + // written to KV upon commit. + // Source (1) serves as a cache. Source (2) allows a transaction to see its + // own modifications while bypassing the descriptor lease mechanism. The + // lease mechanism will have its own transaction to read the descriptor and + // will hang waiting for the uncommitted changes to the descriptor if this + // transaction is PRIORITY HIGH. These descriptors are local to this + // Collection and their state is thus not visible to other transactions. + stored storedDescriptors // syntheticDescriptors contains in-memory descriptors which override all // other matching descriptors during immutable descriptor resolution (by name @@ -154,7 +154,7 @@ func (tc *Collection) ResetMaxTimestampBound() { tc.maxTimestampBoundDeadlineHolder.maxTimestampBound = hlc.Timestamp{} } -// SkipValidationOnWrite avoids validating uncommitted descriptors prior to +// SkipValidationOnWrite avoids validating stored descriptors prior to // a transaction commit. func (tc *Collection) SkipValidationOnWrite() { tc.skipValidationOnWrite = true @@ -177,8 +177,7 @@ func (tc *Collection) ReleaseLeases(ctx context.Context) { // ReleaseAll calls ReleaseLeases. func (tc *Collection) ReleaseAll(ctx context.Context) { tc.ReleaseLeases(ctx) - tc.uncommitted.reset() - tc.kv.reset(ctx) + tc.stored.reset(ctx) tc.synthetic.reset() tc.deletedDescs = catalog.DescriptorIDSet{} tc.skipValidationOnWrite = false @@ -187,13 +186,13 @@ func (tc *Collection) ReleaseAll(ctx context.Context) { // HasUncommittedTables returns true if the Collection contains uncommitted // tables. func (tc *Collection) HasUncommittedTables() bool { - return tc.uncommitted.hasUncommittedTables() + return tc.stored.hasUncommittedTables() } // HasUncommittedTypes returns true if the Collection contains uncommitted // types. func (tc *Collection) HasUncommittedTypes() bool { - return tc.uncommitted.hasUncommittedTypes() + return tc.stored.hasUncommittedTypes() } // AddUncommittedDescriptor adds an uncommitted descriptor modified in the @@ -205,11 +204,11 @@ func (tc *Collection) HasUncommittedTypes() bool { // will return this exact object. Subsequent attempts to resolve this descriptor // immutably will return a copy of the descriptor in the current state. A deep // copy is performed in this call. -func (tc *Collection) AddUncommittedDescriptor(desc catalog.MutableDescriptor) error { - // Invalidate all the cached descriptors since a stale copy of this may be - // included. - tc.kv.releaseAllDescriptors() - return tc.uncommitted.checkIn(desc) +func (tc *Collection) AddUncommittedDescriptor( + ctx context.Context, desc catalog.MutableDescriptor, +) error { + _, err := tc.stored.add(ctx, desc, checkedOutAtLeastOnce) + return err } // ValidateOnWriteEnabled is the cluster setting used to enable or disable @@ -232,7 +231,7 @@ func (tc *Collection) WriteDescToBatch( return err } } - if err := tc.AddUncommittedDescriptor(desc); err != nil { + if err := tc.AddUncommittedDescriptor(ctx, desc); err != nil { return err } descKey := catalogkeys.MakeDescMetadataKey(tc.codec(), desc.GetID()) @@ -260,7 +259,7 @@ func (tc *Collection) WriteDesc( // returned for each schema change is ClusterVersion - 1, because that's the one // that will be used when checking for table descriptor two version invariance. func (tc *Collection) GetDescriptorsWithNewVersion() (originalVersions []lease.IDVersion) { - _ = tc.uncommitted.iterateNewVersionByID(func(originalVersion lease.IDVersion) error { + _ = tc.stored.iterateNewVersionByID(func(originalVersion lease.IDVersion) error { originalVersions = append(originalVersions, originalVersion) return nil }) @@ -270,7 +269,7 @@ func (tc *Collection) GetDescriptorsWithNewVersion() (originalVersions []lease.I // GetUncommittedTables returns all the tables updated or created in the // transaction. func (tc *Collection) GetUncommittedTables() (tables []catalog.TableDescriptor) { - return tc.uncommitted.getUncommittedTables() + return tc.stored.getUncommittedTables() } func newMutableSyntheticDescriptorAssertionError(id descpb.ID) error { @@ -281,7 +280,7 @@ func newMutableSyntheticDescriptorAssertionError(id descpb.ID) error { // first checking the Collection's cached descriptors for validity if validate // is set to true before defaulting to a key-value scan, if necessary. func (tc *Collection) GetAllDescriptors(ctx context.Context, txn *kv.Txn) (nstree.Catalog, error) { - return tc.kv.getAllDescriptors(ctx, txn, tc.version) + return tc.stored.getAllDescriptors(ctx, txn, tc.version) } // GetAllDatabaseDescriptors returns all database descriptors visible by the @@ -294,7 +293,7 @@ func (tc *Collection) GetAllDatabaseDescriptors( ctx context.Context, txn *kv.Txn, ) ([]catalog.DatabaseDescriptor, error) { vd := tc.newValidationDereferencer(txn) - return tc.kv.getAllDatabaseDescriptors(ctx, tc.version, txn, vd) + return tc.stored.getAllDatabaseDescriptors(ctx, tc.version, txn, vd) } // GetAllTableDescriptorsInDatabase returns all the table descriptors visible to @@ -335,7 +334,7 @@ func (tc *Collection) GetAllTableDescriptorsInDatabase( func (tc *Collection) GetSchemasForDatabase( ctx context.Context, txn *kv.Txn, dbDesc catalog.DatabaseDescriptor, ) (map[descpb.ID]string, error) { - return tc.kv.getSchemasForDatabase(ctx, txn, dbDesc) + return tc.stored.getSchemasForDatabase(ctx, txn, dbDesc) } // GetObjectNamesAndIDs returns the names and IDs of all objects in a database and schema. @@ -416,7 +415,7 @@ func (tc *Collection) RemoveSyntheticDescriptor(id descpb.ID) { } func (tc *Collection) codec() keys.SQLCodec { - return tc.kv.codec + return tc.stored.codec } // AddDeletedDescriptor is temporarily tracking descriptors that have been, diff --git a/pkg/sql/catalog/descs/collection_test.go b/pkg/sql/catalog/descs/collection_test.go index 036400e06bbc..31db715664f6 100644 --- a/pkg/sql/catalog/descs/collection_test.go +++ b/pkg/sql/catalog/descs/collection_test.go @@ -271,7 +271,7 @@ func TestAddUncommittedDescriptorAndMutableResolution(t *testing.T) { require.NoError(t, err) require.Same(t, immByID, immResolvedWithNewNameButHasOldName) - require.NoError(t, descriptors.AddUncommittedDescriptor(mut)) + require.NoError(t, descriptors.AddUncommittedDescriptor(ctx, mut)) immByNameAfter, err := descriptors.GetImmutableDatabaseByName(ctx, txn, "new_name", flags) require.NoError(t, err) @@ -598,7 +598,7 @@ func TestCollectionPreservesPostDeserializationChanges(t *testing.T) { // TestCollectionProperlyUsesMemoryMonitoring ensures that memory monitoring // on Collection is working properly. -// Namely, we are currently only tracking memory usage on Collection.kvDescriptors +// Namely, we are currently only tracking memory usage on Collection.storedDescriptors // since it reads all descriptors from storage, which can be huge. // // The testing strategy is to @@ -610,7 +610,7 @@ func TestCollectionPreservesPostDeserializationChanges(t *testing.T) { // 3. Change the monitor budget to something small. Repeat step 2 and expect an error // being thrown out when reading all those descriptors into memory to validate the // memory monitor indeed kicked in and had an effect. -func TestKVDescriptorsProperlyUsesMemoryMonitoring(t *testing.T) { +func TestCollectionProperlyUsesMemoryMonitoring(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -663,3 +663,123 @@ func TestKVDescriptorsProperlyUsesMemoryMonitoring(t *testing.T) { require.Equal(t, int64(0), monitor.AllocBytes()) monitor.Stop(ctx) } + +// TestDescriptorCache ensures that when descriptors are modified, a batch +// lookup on the Collection views the latest changes. +func TestDescriptorCache(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + + tdb := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + tdb.Exec(t, `CREATE DATABASE db`) + tdb.Exec(t, `USE db`) + tdb.Exec(t, `CREATE SCHEMA schema`) + tdb.Exec(t, `CREATE TABLE db.schema.table()`) + + s0 := tc.Server(0) + execCfg := s0.ExecutorConfig().(sql.ExecutorConfig) + t.Run("all descriptors", func(t *testing.T) { + require.NoError(t, sql.DescsTxn(ctx, &execCfg, func( + ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, + ) error { + // Warm up cache. + _, err := descriptors.GetAllDescriptors(ctx, txn) + if err != nil { + return err + } + // Modify table descriptor. + tn := tree.MakeTableNameWithSchema("db", "schema", "table") + flags := tree.ObjectLookupFlagsWithRequired() + flags.RequireMutable = true + _, mut, err := descriptors.GetMutableTableByName(ctx, txn, &tn, flags) + if err != nil { + return err + } + require.NotNil(t, mut) + mut.Name = "new_name" + err = descriptors.AddUncommittedDescriptor(ctx, mut) + if err != nil { + return err + } + // The collection's all descriptors should include the modification. + cat, err := descriptors.GetAllDescriptors(ctx, txn) + if err != nil { + return err + } + found := cat.LookupDescriptorEntry(mut.ID) + require.NotEmpty(t, found) + require.Equal(t, found, mut.ImmutableCopy()) + return nil + })) + }) + t.Run("all db descriptors", func(t *testing.T) { + require.NoError(t, sql.DescsTxn(ctx, &execCfg, func( + ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, + ) error { + // Warm up cache. + _, err := descriptors.GetAllDatabaseDescriptors(ctx, txn) + if err != nil { + return err + } + // Modify database descriptor. + flags := tree.DatabaseLookupFlags{} + flags.RequireMutable = true + mut, err := descriptors.GetMutableDatabaseByName(ctx, txn, "db", flags) + if err != nil { + return err + } + require.NotNil(t, mut) + mut.Version += 1 + err = descriptors.AddUncommittedDescriptor(ctx, mut) + if err != nil { + return err + } + // The collection's all database descriptors should reflect the + // modification. + dbDescs, err := descriptors.GetAllDatabaseDescriptors(ctx, txn) + if err != nil { + return err + } + require.Len(t, dbDescs, 4) + require.Equal(t, mut.ImmutableCopy(), dbDescs[0]) + return nil + })) + }) + t.Run("schemas for database", func(t *testing.T) { + require.NoError(t, sql.DescsTxn(ctx, &execCfg, func( + ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, + ) error { + // Warm up cache. + dbDesc, err := descriptors.GetDatabaseDesc(ctx, txn, "db", tree.DatabaseLookupFlags{}) + if err != nil { + return err + } + _, err = descriptors.GetSchemasForDatabase(ctx, txn, dbDesc) + if err != nil { + return err + } + // Modify schema name. + schemaDesc, err := descriptors.GetMutableSchemaByName(ctx, txn, dbDesc, "schema", tree.SchemaLookupFlags{Required: true}) + if err != nil { + return err + } + schemaDesc.SchemaDesc().Name = "new_name" + err = descriptors.AddUncommittedDescriptor(ctx, schemaDesc.(catalog.MutableDescriptor)) + if err != nil { + return err + } + // The collection's schemas for database should reflect the modification. + schemas, err := descriptors.GetSchemasForDatabase(ctx, txn, dbDesc) + if err != nil { + return err + } + require.Len(t, schemas, 2) + require.Equal(t, schemaDesc.GetName(), schemas[schemaDesc.GetID()]) + return nil + })) + }) +} diff --git a/pkg/sql/catalog/descs/descriptor.go b/pkg/sql/catalog/descs/descriptor.go index a21ebd9849c6..8ae6a97f3d47 100644 --- a/pkg/sql/catalog/descs/descriptor.go +++ b/pkg/sql/catalog/descs/descriptor.go @@ -109,7 +109,7 @@ func (tc *Collection) getDescriptorsByID( for _, fn := range []func(id descpb.ID) (catalog.Descriptor, error){ q.lookupVirtual, q.lookupSynthetic, - q.lookupUncommitted, + q.lookupCached, q.lookupLeased, } { for i, id := range ids { @@ -138,13 +138,13 @@ func (tc *Collection) getDescriptorsByID( // No KV lookup necessary, return early. return descs, nil } - kvDescs, err := tc.withReadFromStore(flags.RequireMutable, func() ([]catalog.Descriptor, error) { + kvDescs, err := tc.withReadFromStore(ctx, flags.RequireMutable, func() ([]catalog.Descriptor, error) { ret := make([]catalog.Descriptor, len(remainingIDs)) // Try to re-use any unvalidated descriptors we may have. kvIDs := make([]descpb.ID, 0, len(remainingIDs)) kvIndexes := make([]int, 0, len(remainingIDs)) for i, id := range remainingIDs { - if imm, status := tc.uncommitted.getImmutableByID(id); imm != nil && status == notValidatedYet { + if imm, status := tc.stored.getCachedByID(id); imm != nil && status == notValidatedYet { err := tc.Validate(ctx, txn, catalog.ValidationReadTelemetry, catalog.ValidationLevelCrossReferences, imm) if err != nil { return nil, err @@ -158,7 +158,7 @@ func (tc *Collection) getDescriptorsByID( // Read all others from the store. if len(kvIDs) > 0 { vd := tc.newValidationDereferencer(txn) - kvDescs, err := tc.kv.getByIDs(ctx, tc.version, txn, vd, kvIDs) + kvDescs, err := tc.stored.getByIDs(ctx, tc.version, txn, vd, kvIDs) if err != nil { return nil, err } @@ -224,24 +224,37 @@ func (q *byIDLookupContext) lookupSynthetic(id descpb.ID) (catalog.Descriptor, e return sd, nil } -func (q *byIDLookupContext) lookupUncommitted(id descpb.ID) (_ catalog.Descriptor, err error) { - ud, status := q.tc.uncommitted.getImmutableByID(id) - if ud == nil || status == notValidatedYet { +func (q *byIDLookupContext) lookupCached(id descpb.ID) (_ catalog.Descriptor, err error) { + sd, status := q.tc.stored.getCachedByID(id) + if sd == nil { return nil, nil } - log.VEventf(q.ctx, 2, "found uncommitted descriptor %d", id) + if status == notValidatedYet { + err := q.tc.Validate(q.ctx, q.txn, catalog.ValidationReadTelemetry, catalog.ValidationLevelCrossReferences, sd) + if err != nil { + return nil, err + } + err = q.tc.stored.upgradeToValidated(sd.GetID()) + if err != nil { + return nil, err + } + } + log.VEventf(q.ctx, 2, "found cached descriptor %d", id) + if q.flags.RequireMutable { + sd, err = q.tc.stored.checkOut(id) + if err != nil { + return nil, err + } + } // Hydrate any types in the descriptor if necessary, for uncomitted // descriptors we are going to include offline and get non-cached view. - if tableDesc, isTableDesc := ud.(catalog.TableDescriptor); isTableDesc { - ud, err = q.tc.hydrateTypesInTableDescWithOptions(q.ctx, q.txn, tableDesc, true, true) + if tableDesc, isTableDesc := sd.(catalog.TableDescriptor); isTableDesc { + sd, err = q.tc.hydrateTypesInTableDescWithOptions(q.ctx, q.txn, tableDesc, true, true) if err != nil { return nil, err } } - if !q.flags.RequireMutable { - return ud, nil - } - return q.tc.uncommitted.checkOut(id) + return sd, nil } func (q *byIDLookupContext) lookupLeased(id descpb.ID) (catalog.Descriptor, error) { @@ -253,7 +266,7 @@ func (q *byIDLookupContext) lookupLeased(id descpb.ID) (catalog.Descriptor, erro // // TODO(ajwerner): More generally leverage this set of kv descriptors on // the resolution path. - if q.tc.kv.idDefinitelyDoesNotExist(id) { + if q.tc.stored.idDefinitelyDoesNotExist(id) { return nil, catalog.ErrDescriptorNotFound } desc, shouldReadFromStore, err := q.tc.leased.getByID(q.ctx, q.tc.deadlineHolder(q.txn), id) @@ -321,20 +334,17 @@ func (tc *Collection) getByName( } { - refuseFurtherLookup, ud := tc.uncommitted.getByName(parentID, parentSchemaID, name) + ud := tc.stored.getCachedByName(parentID, parentSchemaID, name) if ud != nil { - log.VEventf(ctx, 2, "found uncommitted descriptor %d", ud.GetID()) + log.VEventf(ctx, 2, "found cached descriptor %d", ud.GetID()) if mutable { - ud, err = tc.uncommitted.checkOut(ud.GetID()) + ud, err = tc.stored.checkOut(ud.GetID()) if err != nil { return false, nil, err } } return true, ud, nil } - if refuseFurtherLookup { - return false, nil, nil - } } if !avoidLeased && !mutable && !lease.TestingTableLeasesAreDisabled() { @@ -349,18 +359,18 @@ func (tc *Collection) getByName( } var descs []catalog.Descriptor - descs, err = tc.withReadFromStore(mutable, func() ([]catalog.Descriptor, error) { + descs, err = tc.withReadFromStore(ctx, mutable, func() ([]catalog.Descriptor, error) { // Try to re-use an unvalidated descriptor if there is one. - if imm := tc.uncommitted.getUnvalidatedByName(parentID, parentSchemaID, name); imm != nil { + if imm := tc.stored.getUnvalidatedByName(parentID, parentSchemaID, name); imm != nil { return []catalog.Descriptor{imm}, tc.Validate(ctx, txn, catalog.ValidationReadTelemetry, catalog.ValidationLevelCrossReferences, imm) } // If not possible, read it from the store. - uncommittedParent, _ := tc.uncommitted.getImmutableByID(parentID) + uncommittedParent, _ := tc.stored.getCachedByID(parentID) uncommittedDB, _ := catalog.AsDatabaseDescriptor(uncommittedParent) version := tc.settings.Version.ActiveVersion(ctx) vd := tc.newValidationDereferencer(txn) - imm, err := tc.kv.getByName(ctx, version, txn, vd, uncommittedDB, parentID, parentSchemaID, name) + imm, err := tc.stored.getByName(ctx, version, txn, vd, uncommittedDB, parentID, parentSchemaID, name) if err != nil { return nil, err } @@ -373,11 +383,11 @@ func (tc *Collection) getByName( } // withReadFromStore updates the state of the Collection, especially its -// uncommitted descriptors layer, after reading a descriptor from the storage +// stored descriptors layer, after reading a descriptor from the storage // layer. The logic is the same regardless of whether the descriptor was read // by name or by ID. func (tc *Collection) withReadFromStore( - requireMutable bool, readFn func() ([]catalog.Descriptor, error), + ctx context.Context, requireMutable bool, readFn func() ([]catalog.Descriptor, error), ) (descs []catalog.Descriptor, _ error) { descs, err := readFn() if err != nil { @@ -387,12 +397,12 @@ func (tc *Collection) withReadFromStore( if desc == nil { continue } - desc, err = tc.uncommitted.add(desc.NewBuilder().BuildExistingMutable(), notCheckedOutYet) + desc, err = tc.stored.add(ctx, desc.NewBuilder().BuildExistingMutable(), notCheckedOutYet) if err != nil { return nil, err } if requireMutable { - desc, err = tc.uncommitted.checkOut(desc.GetID()) + desc, err = tc.stored.checkOut(desc.GetID()) if err != nil { return nil, err } @@ -453,7 +463,7 @@ func getSchemaByName( if isDone, sc := tc.temporary.getSchemaByName(ctx, db.GetID(), name); sc != nil || isDone { return sc != nil, sc, nil } - scID, err := tc.kv.lookupName(ctx, txn, nil /* maybeDB */, db.GetID(), keys.RootNamespaceID, name) + scID, err := tc.stored.lookupName(ctx, txn, nil /* maybeDB */, db.GetID(), keys.RootNamespaceID, name) if err != nil || scID == descpb.InvalidID { return false, nil, err } diff --git a/pkg/sql/catalog/descs/hydrate.go b/pkg/sql/catalog/descs/hydrate.go index 0393212f0269..50e8a2ed3621 100644 --- a/pkg/sql/catalog/descs/hydrate.go +++ b/pkg/sql/catalog/descs/hydrate.go @@ -134,7 +134,7 @@ func (tc *Collection) hydrateTypesInTableDescWithOptions( // TODO(ajwerner): Consider surfacing the mechanism used to retrieve the // descriptor up to this layer. if tc.hydratedTables != nil && - tc.uncommitted.descs.GetByID(desc.GetID()) == nil && + tc.stored.descs.GetByID(desc.GetID()) == nil && tc.synthetic.descs.GetByID(desc.GetID()) == nil { hydrated, err := tc.hydratedTables.GetHydratedTableDescriptor(ctx, t, getType) if err != nil { diff --git a/pkg/sql/catalog/descs/kv_descriptors.go b/pkg/sql/catalog/descs/kv_descriptors.go deleted file mode 100644 index f4161a8a934a..000000000000 --- a/pkg/sql/catalog/descs/kv_descriptors.go +++ /dev/null @@ -1,345 +0,0 @@ -// Copyright 2021 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package descs - -import ( - "context" - - "github.com/cockroachdb/cockroach/pkg/clusterversion" - "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/kv" - "github.com/cockroachdb/cockroach/pkg/sql/catalog" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/dbdesc" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/internal/catkv" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/internal/validate" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/nstree" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/resolver" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" - "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/mon" - "github.com/cockroachdb/errors" -) - -type kvDescriptors struct { - codec keys.SQLCodec - - // systemNamespace is a cache of system table namespace entries. We assume - // these are immutable for the life of the process. - systemNamespace *systemDatabaseNamespaceCache - - // allDescriptors is a slice of all available descriptors. The descriptors - // are cached to avoid repeated lookups by users like virtual tables. The - // cache is purged whenever events would cause a scan of all descriptors to - // return different values, such as when the txn timestamp changes or when - // new descriptors are written in the txn. - // - // TODO(ajwerner): This cache may be problematic in clusters with very large - // numbers of descriptors. - allDescriptors allDescriptors - - // allDatabaseDescriptors is a slice of all available database descriptors. - // These are purged at the same time as allDescriptors. - allDatabaseDescriptors []catalog.DatabaseDescriptor - - // allSchemasForDatabase maps databaseID -> schemaID -> schemaName. - // For each databaseID, all schemas visible under the database can be - // observed. - // These are purged at the same time as allDescriptors. - allSchemasForDatabase map[descpb.ID]map[descpb.ID]string - - // memAcc is the actual account of an injected, upstream monitor - // to track memory usage of kvDescriptors. - memAcc mon.BoundAccount -} - -// allDescriptors is an abstraction to capture the complete set of descriptors -// read from the store. It is used to accelerate repeated invocations of virtual -// tables which utilize descriptors. It tends to get used to build a -// sql.internalLookupCtx. -// -// TODO(ajwerner): Memory monitoring. -// TODO(ajwerner): Unify this struct with the uncommittedDescriptors set. -// TODO(ajwerner): Unify the sql.internalLookupCtx with the descs.Collection. -type allDescriptors struct { - c nstree.Catalog -} - -func (d *allDescriptors) init(c nstree.Catalog) { - d.c = c -} - -func (d *allDescriptors) clear() { - *d = allDescriptors{} -} - -func (d *allDescriptors) isUnset() bool { - return !d.c.IsInitialized() -} - -func (d *allDescriptors) contains(id descpb.ID) bool { - return d.c.IsInitialized() && d.c.LookupDescriptorEntry(id) != nil -} - -func makeKVDescriptors( - codec keys.SQLCodec, systemNamespace *systemDatabaseNamespaceCache, monitor *mon.BytesMonitor, -) kvDescriptors { - return kvDescriptors{ - codec: codec, - systemNamespace: systemNamespace, - memAcc: monitor.MakeBoundAccount(), - } -} - -func (kd *kvDescriptors) reset(ctx context.Context) { - kd.releaseAllDescriptors() - kd.memAcc.Clear(ctx) -} - -// releaseAllDescriptors releases the cached slice of all descriptors -// held by Collection. -// -// TODO(ajwerner): Make this unnecessary by ensuring that all writes properly -// interact with this layer. -func (kd *kvDescriptors) releaseAllDescriptors() { - kd.allDescriptors.clear() - kd.allDatabaseDescriptors = nil - kd.allSchemasForDatabase = nil -} - -// lookupName is used when reading a descriptor from the storage layer by name. -// Descriptors are physically keyed by ID, so we need to resolve their ID by -// querying the system.namespace table first, which is what this method does. -// We can avoid having to do this in some special cases: -// - When the descriptor name and ID are hard-coded. This is the case for the -// system database and for the tables in it. -// - When we're looking up a schema for which we already have the descriptor -// of the parent database. The schema ID can be looked up in it. -// -func (kd *kvDescriptors) lookupName( - ctx context.Context, - txn *kv.Txn, - maybeDB catalog.DatabaseDescriptor, - parentID descpb.ID, - parentSchemaID descpb.ID, - name string, -) (id descpb.ID, err error) { - // Handle special cases which might avoid a namespace table query. - switch parentID { - case descpb.InvalidID: - if name == systemschema.SystemDatabaseName { - // Special case: looking up the system database. - // The system database's descriptor ID is hard-coded. - return keys.SystemDatabaseID, nil - } - case keys.SystemDatabaseID: - // Special case: looking up something in the system database. - // Those namespace table entries are cached. - id = kd.systemNamespace.lookup(parentSchemaID, name) - if id != descpb.InvalidID { - return id, err - } - // Make sure to cache the result if we had to look it up. - defer func() { - if err == nil && id != descpb.InvalidID { - kd.systemNamespace.add(descpb.NameInfo{ - ParentID: keys.SystemDatabaseID, - ParentSchemaID: parentSchemaID, - Name: name, - }, id) - } - }() - default: - if parentSchemaID == descpb.InvalidID { - // At this point we know that parentID is not zero, so a zero - // parentSchemaID means we're looking up a schema. - if maybeDB != nil { - // Special case: looking up a schema, but in a database which we already - // have the descriptor for. We find the schema ID in there. - id := maybeDB.GetSchemaID(name) - return id, nil - } - } - } - // Fall back to querying the namespace table. - return catkv.LookupID( - ctx, txn, kd.codec, parentID, parentSchemaID, name, - ) -} - -// getByName reads a descriptor from the storage layer by name. -// -// This is a three-step process: -// 1. resolve the descriptor's ID using the name information, -// 2. actually read the descriptor from storage, -// 3. check that the name in the descriptor is the one we expect; meaning that -// there is no RENAME underway for instance. -// -func (kd *kvDescriptors) getByName( - ctx context.Context, - version clusterversion.ClusterVersion, - txn *kv.Txn, - vd validate.ValidationDereferencer, - maybeDB catalog.DatabaseDescriptor, - parentID descpb.ID, - parentSchemaID descpb.ID, - name string, -) (catalog.Descriptor, error) { - descID, err := kd.lookupName(ctx, txn, maybeDB, parentID, parentSchemaID, name) - if err != nil || descID == descpb.InvalidID { - return nil, err - } - descs, err := kd.getByIDs(ctx, version, txn, vd, []descpb.ID{descID}) - if err != nil { - if errors.Is(err, catalog.ErrDescriptorNotFound) { - // Having done the namespace lookup, the descriptor must exist. - return nil, errors.WithAssertionFailure(err) - } - return nil, err - } - if descs[0].GetName() != name { - // Immediately after a RENAME an old name still points to the descriptor - // during the drain phase for the name. Do not return a descriptor during - // draining. - // - // TODO(postamar): remove this after 22.1 is release. - // At that point, draining names will no longer have to be supported. - // We can then consider making the descriptor collection aware of - // uncommitted namespace operations. - return nil, nil - } - return descs[0], nil -} - -// getByIDs actually reads a batch of descriptors from the storage layer. -func (kd *kvDescriptors) getByIDs( - ctx context.Context, - version clusterversion.ClusterVersion, - txn *kv.Txn, - vd validate.ValidationDereferencer, - ids []descpb.ID, -) ([]catalog.Descriptor, error) { - ret := make([]catalog.Descriptor, len(ids)) - kvIDs := make([]descpb.ID, 0, len(ids)) - indexes := make([]int, 0, len(ids)) - for i, id := range ids { - if id == keys.SystemDatabaseID { - // Special handling for the system database descriptor. - // - // This is done for performance reasons, to save ourselves an unnecessary - // round trip to storage which otherwise quickly compounds. - // - // The system database descriptor should never actually be mutated, which is - // why we return the same hard-coded descriptor every time. It's assumed - // that callers of this method will check the privileges on the descriptor - // (like any other database) and return an error. - ret[i] = dbdesc.NewBuilder(systemschema.SystemDB.DatabaseDesc()).BuildExistingMutable() - } else { - kvIDs = append(kvIDs, id) - indexes = append(indexes, i) - } - } - if len(kvIDs) == 0 { - return ret, nil - } - kvDescs, err := catkv.MustGetDescriptorsByID(ctx, version, kd.codec, txn, vd, kvIDs, catalog.Any) - if err != nil { - return nil, err - } - for j, desc := range kvDescs { - ret[indexes[j]] = desc - } - return ret, nil -} - -func (kd *kvDescriptors) getAllDescriptors( - ctx context.Context, txn *kv.Txn, version clusterversion.ClusterVersion, -) (nstree.Catalog, error) { - if kd.allDescriptors.isUnset() { - c, err := catkv.GetCatalogUnvalidated(ctx, kd.codec, txn) - if err != nil { - return nstree.Catalog{}, err - } - - // Monitor memory usage of the catalog recently read from storage. - if err := kd.memAcc.Grow(ctx, c.ByteSize()); err != nil { - err = errors.Wrap(err, "Memory usage exceeds limit for kvDescriptors.allDescriptors.") - return nstree.Catalog{}, err - } - - descs := c.OrderedDescriptors() - ve := c.Validate(ctx, version, catalog.ValidationReadTelemetry, catalog.ValidationLevelCrossReferences, descs...) - if err := ve.CombinedError(); err != nil { - return nstree.Catalog{}, err - } - - // There could be tables with user defined types that need hydrating. - if err := HydrateGivenDescriptors(ctx, descs); err != nil { - // If we ran into an error hydrating the types, that means that we - // have some sort of corrupted descriptor state. Rather than disable - // uses of getAllDescriptors, just log the error. - log.Errorf(ctx, "%s", err.Error()) - } - - kd.allDescriptors.init(c) - } - return kd.allDescriptors.c, nil -} - -func (kd *kvDescriptors) getAllDatabaseDescriptors( - ctx context.Context, - version clusterversion.ClusterVersion, - txn *kv.Txn, - vd validate.ValidationDereferencer, -) ([]catalog.DatabaseDescriptor, error) { - if kd.allDatabaseDescriptors == nil { - c, err := catkv.GetAllDatabaseDescriptorIDs(ctx, txn, kd.codec) - if err != nil { - return nil, err - } - dbDescs, err := catkv.MustGetDescriptorsByID(ctx, version, kd.codec, txn, vd, c.OrderedDescriptorIDs(), catalog.Database) - if err != nil { - return nil, err - } - kd.allDatabaseDescriptors = make([]catalog.DatabaseDescriptor, len(dbDescs)) - for i, dbDesc := range dbDescs { - kd.allDatabaseDescriptors[i] = dbDesc.(catalog.DatabaseDescriptor) - } - } - return kd.allDatabaseDescriptors, nil -} - -func (kd *kvDescriptors) getSchemasForDatabase( - ctx context.Context, txn *kv.Txn, db catalog.DatabaseDescriptor, -) (map[descpb.ID]string, error) { - if kd.allSchemasForDatabase == nil { - kd.allSchemasForDatabase = make(map[descpb.ID]map[descpb.ID]string) - } - if _, ok := kd.allSchemasForDatabase[db.GetID()]; !ok { - var err error - allSchemas, err := resolver.GetForDatabase(ctx, txn, kd.codec, db) - if err != nil { - return nil, err - } - kd.allSchemasForDatabase[db.GetID()] = make(map[descpb.ID]string) - for id, entry := range allSchemas { - kd.allSchemasForDatabase[db.GetID()][id] = entry.Name - } - } - return kd.allSchemasForDatabase[db.GetID()], nil -} - -func (kd *kvDescriptors) idDefinitelyDoesNotExist(id descpb.ID) bool { - if kd.allDescriptors.isUnset() { - return false - } - return !kd.allDescriptors.contains(id) -} diff --git a/pkg/sql/catalog/descs/stored_descriptors.go b/pkg/sql/catalog/descs/stored_descriptors.go new file mode 100644 index 000000000000..29c46e9fa65f --- /dev/null +++ b/pkg/sql/catalog/descs/stored_descriptors.go @@ -0,0 +1,732 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package descs + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/dbdesc" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/internal/catkv" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/internal/validate" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/nstree" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/resolver" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc" + "github.com/cockroachdb/cockroach/pkg/util/iterutil" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/errors" +) + +// storedDescriptorStatus is the status of a stored descriptor. +type storedDescriptorStatus int + +const ( + // notValidatedYet designates descriptors which have been read from + // storage but have not been validated yet. Until they're validated they + // cannot be used for anything else than validating other descriptors. + notValidatedYet storedDescriptorStatus = iota + + // notCheckedOutYet designates descriptors which have been properly read from + // storage and have been validated but have never been checked out yet. This + // means that the mutable and immutable descriptor protos are known to be the + // exact same. + notCheckedOutYet + + // checkedOutAtLeastOnce designates descriptors which have been checked out at + // least once. Newly-created descriptors are considered to have been checked + // out as well. + checkedOutAtLeastOnce +) + +// storedDescriptor is a descriptor that has been cached after being read +// and/or being modified in the current transaction. +type storedDescriptor struct { + + // immutable holds the descriptor as it was when this struct was initialized, + // either after being read from storage or after being checked in. + immutable catalog.Descriptor + + // mutable is initialized as a mutable copy of immutable when the descriptor + // is read from storage. + // This value might be nil in some rare cases where we completely bypass + // storage for performance reasons because the descriptor is guaranteed to + // never change. Such is the case of the system database descriptor for + // instance. + // This value should not make its way outside the storedDescriptors + // other than via checkOut. + mutable catalog.MutableDescriptor + + // storedDescriptorStatus describes the status of the mutable and + // immutable descriptors + storedDescriptorStatus +} + +// GetName implements the catalog.NameEntry interface. +func (u *storedDescriptor) GetName() string { + return u.immutable.GetName() +} + +// GetParentID implements the catalog.NameEntry interface. +func (u *storedDescriptor) GetParentID() descpb.ID { + return u.immutable.GetParentID() +} + +// GetParentSchemaID implements the catalog.NameEntry interface. +func (u storedDescriptor) GetParentSchemaID() descpb.ID { + return u.immutable.GetParentSchemaID() +} + +// GetID implements the catalog.NameEntry interface. +func (u storedDescriptor) GetID() descpb.ID { + return u.immutable.GetID() +} + +// checkOut is how the mutable descriptor should be accessed. +func (u *storedDescriptor) checkOut() catalog.MutableDescriptor { + if u.mutable == nil { + // This special case is allowed for certain system descriptors which + // for performance reasons are never actually read from storage, instead + // we use a copy of the descriptor hard-coded in the system schema used for + // bootstrapping a cluster. + // + // This implies that these descriptors never undergo any changes and + // therefore checking out a mutable descriptor is pointless for the most + // part. This may nonetheless legitimately happen during migrations + // which change all descriptors somehow, so we need to support this. + return u.immutable.NewBuilder().BuildExistingMutable() + } + u.storedDescriptorStatus = checkedOutAtLeastOnce + return u.mutable +} + +var _ catalog.NameEntry = (*storedDescriptor)(nil) + +// storedDescriptors is the data structure holding all +// storedDescriptor objects for a Collection. +// +// Immutable descriptors can be freely looked up. +// Mutable descriptors can be: +// 1. added into it, +// 2. checked out of it, +// 3. checked back in to it. +// +// An error will be triggered by: +// - checking out a mutable descriptor that hasn't yet been added, +// - checking in a descriptor that has been added but not yet checked out, +// - any checked-out-but-not-checked-in mutable descriptors at commit time. +// +type storedDescriptors struct { + codec keys.SQLCodec + + // A mirror of the descriptors in storage. These descriptors are either (1) + // already stored and were read from KV, or (2) have been modified by the + // uncommitted transaction affiliated with this Collection and should be + // written to KV upon commit. + // Source (1) serves as a cache. Source (2) allows a transaction to see its + // own modifications while bypassing the descriptor lease mechanism. The + // lease mechanism will have its own transaction to read the descriptor and + // will hang waiting for the uncommitted changes to the descriptor if this + // transaction is PRIORITY HIGH. These descriptors are local to this + // Collection and their state is thus not visible to other transactions. + descs nstree.Map + + // addedSystemDatabase is used to mark whether the optimization to add the + // system database to the set of stored descriptors has occurred. + addedSystemDatabase bool + + // systemNamespace is a cache of system table namespace entries. We assume + // these are immutable for the life of the process. + systemNamespace *systemDatabaseNamespaceCache + + // hasAll* indicates previously completed range storage lookups. When set, we + // know these descriptors are cached in the map. + hasAllDescriptors bool + hasAllDatabaseDescriptors bool + + // allSchemasForDatabase maps databaseID -> schemaID -> schemaName. + // For each databaseID, all schemas visible under the database can be + // observed. + // These are read from store, which means they may not be up to date if + // modifications have been made. The freshest schemas should be in the map + // above. + allSchemasForDatabase map[descpb.ID]map[descpb.ID]string + + // memAcc is the actual account of an injected, upstream monitor + // to track memory usage of storedDescriptors. + memAcc mon.BoundAccount +} + +func makeStoredDescriptors( + codec keys.SQLCodec, systemNamespace *systemDatabaseNamespaceCache, monitor *mon.BytesMonitor, +) storedDescriptors { + return storedDescriptors{ + codec: codec, + systemNamespace: systemNamespace, + memAcc: monitor.MakeBoundAccount(), + } +} + +func (sd *storedDescriptors) reset(ctx context.Context) { + sd.descs.Clear() + sd.addedSystemDatabase = false + sd.hasAllDescriptors = false + sd.hasAllDatabaseDescriptors = false + sd.allSchemasForDatabase = nil + sd.memAcc.Clear(ctx) +} + +// add adds a descriptor to the set of stored descriptors and returns +// an immutable copy of that descriptor. +func (sd *storedDescriptors) add( + ctx context.Context, mut catalog.MutableDescriptor, status storedDescriptorStatus, +) (catalog.Descriptor, error) { + uNew, err := makeStoredDescriptor(mut, status) + if err != nil { + return nil, err + } + if prev, ok := sd.descs.GetByID(mut.GetID()).(*storedDescriptor); ok { + if prev.mutable.OriginalVersion() != mut.OriginalVersion() { + return nil, errors.AssertionFailedf( + "cannot add a version of descriptor with a different original version" + + " than it was previously added with") + } + } else { + if err := sd.memAcc.Grow(ctx, mut.ByteSize()); err != nil { + err = errors.Wrap(err, "Memory usage exceeds limit for storedDescriptors.descs") + return nil, err + } + } + sd.descs.Upsert(uNew) + return uNew.immutable, err +} + +// checkOut checks out a stored mutable descriptor for use in the +// transaction. This descriptor should later be checked in again. +func (sd *storedDescriptors) checkOut(id descpb.ID) (_ catalog.MutableDescriptor, err error) { + defer func() { + err = errors.NewAssertionErrorWithWrappedErrf( + err, "cannot check out stored descriptor with ID %d", id, + ) + }() + if id == keys.SystemDatabaseID { + sd.maybeAddSystemDatabase() + } + entry := sd.descs.GetByID(id) + if entry == nil { + return nil, errors.New("descriptor hasn't been added yet") + } + u := entry.(*storedDescriptor) + if u.storedDescriptorStatus == notValidatedYet { + return nil, errors.New("descriptor hasn't been validated yet") + } + return u.checkOut(), nil +} + +// checkIn checks in a stored mutable descriptor that was previously +// checked out. +func (sd *storedDescriptors) checkIn(mut catalog.MutableDescriptor) error { + uNew, err := makeStoredDescriptor(mut, checkedOutAtLeastOnce) + if err != nil { + return err + } + if prev := sd.descs.GetByID(mut.GetID()); prev != nil { + return errors.AssertionFailedf("cannot check in a descriptor that has not been previously checked out") + } + sd.descs.Upsert(uNew) + return err +} + +// upgradeToValidated upgrades a stored descriptor that was previously +// unvalidated to not checked out yet. +func (sd *storedDescriptors) upgradeToValidated(id descpb.ID) (err error) { + defer func() { + err = errors.NewAssertionErrorWithWrappedErrf( + err, "cannot upgrade stored descriptor with ID %d", id, + ) + }() + entry := sd.descs.GetByID(id) + if entry == nil { + return errors.New("descriptor has not been cached") + } + storedDesc := entry.(*storedDescriptor) + if storedDesc.storedDescriptorStatus == checkedOutAtLeastOnce { + return errors.New("descriptor has already been checked out") + } + storedDesc.storedDescriptorStatus = notCheckedOutYet + return nil +} + +func makeStoredDescriptor( + desc catalog.MutableDescriptor, status storedDescriptorStatus, +) (*storedDescriptor, error) { + version := desc.GetVersion() + origVersion := desc.OriginalVersion() + if version != origVersion && version != origVersion+1 { + return nil, errors.AssertionFailedf( + "descriptor %d version %d not compatible with cluster version %d", + desc.GetID(), version, origVersion) + } + + mutable, err := maybeRefreshCachedFieldsOnTypeDescriptor(desc) + if err != nil { + return nil, err + } + + return &storedDescriptor{ + mutable: mutable, + immutable: mutable.ImmutableCopy(), + storedDescriptorStatus: status, + }, nil +} + +// maybeRefreshCachedFieldsOnTypeDescriptor refreshes the cached fields on a +// Mutable if the given descriptor is a type descriptor and works as a pass +// through for all other descriptors. Mutable type descriptors are refreshed to +// reconstruct enumMetadata. This ensures that tables hydration following a +// type descriptor update (in the same txn) happens using the modified fields. +func maybeRefreshCachedFieldsOnTypeDescriptor( + desc catalog.MutableDescriptor, +) (catalog.MutableDescriptor, error) { + typeDesc, ok := desc.(catalog.TypeDescriptor) + if ok { + return typedesc.UpdateCachedFieldsOnModifiedMutable(typeDesc) + } + return desc, nil +} + +// getCachedByID looks up a cached descriptor by ID. +func (sd *storedDescriptors) getCachedByID( + id descpb.ID, +) (catalog.Descriptor, storedDescriptorStatus) { + if id == keys.SystemDatabaseID { + sd.maybeAddSystemDatabase() + } + entry := sd.descs.GetByID(id) + if entry == nil { + return nil, notValidatedYet + } + u := entry.(*storedDescriptor) + return u.immutable, u.storedDescriptorStatus +} + +// getCachedByName looks up a cached descriptor by name. +func (sd *storedDescriptors) getCachedByName( + dbID descpb.ID, schemaID descpb.ID, name string, +) catalog.Descriptor { + if dbID == 0 && schemaID == 0 && name == systemschema.SystemDatabaseName { + sd.maybeAddSystemDatabase() + } + // Walk latest to earliest so that a DROP followed by a CREATE with the same + // name will result in the CREATE being seen. + if got := sd.descs.GetByName(dbID, schemaID, name); got != nil { + u := got.(*storedDescriptor) + if u.storedDescriptorStatus == notValidatedYet { + return nil + } + return u.immutable + } + return nil +} + +// getUnvalidatedByName looks up an unvalidated descriptor by name. +func (sd *storedDescriptors) getUnvalidatedByName( + dbID descpb.ID, schemaID descpb.ID, name string, +) catalog.Descriptor { + if dbID == 0 && schemaID == 0 && name == systemschema.SystemDatabaseName { + sd.maybeAddSystemDatabase() + } + entry := sd.descs.GetByName(dbID, schemaID, name) + if entry == nil { + return nil + } + u := entry.(*storedDescriptor) + if u.storedDescriptorStatus != notValidatedYet { + return nil + } + return u.immutable +} + +// getAllDescriptors looks up all descriptors. The first call must go to KV. +// Subsequent calls can retrieve descriptors from the cache. +func (sd *storedDescriptors) getAllDescriptors( + ctx context.Context, txn *kv.Txn, version clusterversion.ClusterVersion, +) (nstree.Catalog, error) { + if !sd.hasAllDescriptors { + c, err := catkv.GetCatalogUnvalidated(ctx, sd.codec, txn) + if err != nil { + return nstree.Catalog{}, err + } + descs := c.OrderedDescriptors() + ve := c.Validate(ctx, version, catalog.ValidationReadTelemetry, catalog.ValidationLevelCrossReferences, descs...) + if err := ve.CombinedError(); err != nil { + return nstree.Catalog{}, err + } + if err := c.ForEachDescriptorEntry(func(desc catalog.Descriptor) error { + if sd.descs.GetByID(desc.GetID()) == nil { + // Cache this descriptor. + mut := desc.NewBuilder().BuildExistingMutable() + _, err := sd.add(ctx, mut, notCheckedOutYet) + if err != nil { + return err + } + } + return nil + }); err != nil { + return nstree.Catalog{}, err + } + sd.hasAllDescriptors = true + sd.hasAllDatabaseDescriptors = true + } + // TODO(jchan): There's a lot of wasted effort here in reconstructing a + // catalog and hydrating every descriptor. We could do better by caching the + // catalog and invalidating it upon descriptor check-out. + cat := nstree.MutableCatalog{} + if err := sd.descs.IterateByID(func(entry catalog.NameEntry) error { + var desc catalog.Descriptor + sDesc := entry.(*storedDescriptor) + if sDesc.storedDescriptorStatus == checkedOutAtLeastOnce { + desc = sDesc.mutable.ImmutableCopy() + } else { + desc = sDesc.immutable + } + cat.UpsertDescriptorEntry(desc) + return nil + }); err != nil { + return nstree.Catalog{}, err + } + // There could be tables with user defined types that need hydrating. + if err := HydrateGivenDescriptors(ctx, cat.OrderedDescriptors()); err != nil { + // If we ran into an error hydrating the types, that means that we + // have some sort of corrupted descriptor state. Rather than disable + // uses of getAllDescriptors, just log the error. + log.Errorf(ctx, "%s", err.Error()) + } + return cat.Catalog, nil +} + +// getAllDatabaseDescriptors looks up all database descriptors. The first call +// must go to KV, except if getAllDescriptors has been called. Subsequent calls +// can retrieve descriptors from the cache. +func (sd *storedDescriptors) getAllDatabaseDescriptors( + ctx context.Context, + version clusterversion.ClusterVersion, + txn *kv.Txn, + vd validate.ValidationDereferencer, +) ([]catalog.DatabaseDescriptor, error) { + if !sd.hasAllDescriptors && !sd.hasAllDatabaseDescriptors { + c, err := catkv.GetAllDatabaseDescriptorIDs(ctx, txn, sd.codec) + if err != nil { + return nil, err + } + dbDescs, err := catkv.MustGetDescriptorsByID(ctx, version, sd.codec, txn, vd, c.OrderedDescriptorIDs(), catalog.Database) + if err != nil { + return nil, err + } + for _, desc := range dbDescs { + if sd.descs.GetByID(desc.GetID()) == nil { + mut := desc.NewBuilder().BuildExistingMutable() + _, err := sd.add(ctx, mut, notCheckedOutYet) + if err != nil { + return nil, err + } + } + } + sd.hasAllDatabaseDescriptors = true + } + var allDatabaseDescriptors []catalog.DatabaseDescriptor + if err := sd.descs.IterateDatabasesByName(func(entry catalog.NameEntry) error { + sDesc := entry.(*storedDescriptor) + var dbDesc catalog.DatabaseDescriptor + if sDesc.storedDescriptorStatus == checkedOutAtLeastOnce { + dbDesc = sDesc.mutable.ImmutableCopy().(catalog.DatabaseDescriptor) + } else { + dbDesc = sDesc.immutable.(catalog.DatabaseDescriptor) + } + allDatabaseDescriptors = append(allDatabaseDescriptors, dbDesc) + return nil + }); err != nil { + return nil, err + } + return allDatabaseDescriptors, nil +} + +func (sd *storedDescriptors) getSchemasForDatabase( + ctx context.Context, txn *kv.Txn, db catalog.DatabaseDescriptor, +) (map[descpb.ID]string, error) { + if sd.allSchemasForDatabase == nil { + sd.allSchemasForDatabase = make(map[descpb.ID]map[descpb.ID]string) + } + if _, ok := sd.allSchemasForDatabase[db.GetID()]; !ok { + var err error + allSchemas, err := resolver.GetForDatabase(ctx, txn, sd.codec, db) + if err != nil { + return nil, err + } + sd.allSchemasForDatabase[db.GetID()] = make(map[descpb.ID]string) + for id, entry := range allSchemas { + sd.allSchemasForDatabase[db.GetID()][id] = entry.Name + } + return sd.allSchemasForDatabase[db.GetID()], nil + } + schemasForDatabase := sd.allSchemasForDatabase[db.GetID()] + if err := sd.descs.IterateSchemasForDatabaseByName(db.GetID(), func(entry catalog.NameEntry) error { + sDesc := entry.(*storedDescriptor) + var schemaDesc catalog.Descriptor + if sDesc.storedDescriptorStatus == checkedOutAtLeastOnce { + schemaDesc = sDesc.mutable.ImmutableCopy() + } else { + schemaDesc = sDesc.immutable + } + schemasForDatabase[schemaDesc.GetID()] = schemaDesc.GetName() + return nil + }); err != nil { + return nil, err + } + return schemasForDatabase, nil +} + +func (sd *storedDescriptors) idDefinitelyDoesNotExist(id descpb.ID) bool { + if !sd.hasAllDescriptors { + return false + } + return sd.descs.GetByID(id) == nil +} + +// lookupName is used when reading a descriptor from the storage layer by name. +// Descriptors are physically keyed by ID, so we need to resolve their ID by +// querying the system.namespace table first, which is what this method does. +// We can avoid having to do this in some special cases: +// - When the descriptor name and ID are hard-coded. This is the case for the +// system database and for the tables in it. +// - When we're looking up a schema for which we already have the descriptor +// of the parent database. The schema ID can be looked up in it. +// +func (sd *storedDescriptors) lookupName( + ctx context.Context, + txn *kv.Txn, + maybeDB catalog.DatabaseDescriptor, + parentID descpb.ID, + parentSchemaID descpb.ID, + name string, +) (id descpb.ID, err error) { + // Handle special cases which might avoid a namespace table query. + switch parentID { + case descpb.InvalidID: + if name == systemschema.SystemDatabaseName { + // Special case: looking up the system database. + // The system database's descriptor ID is hard-coded. + return keys.SystemDatabaseID, nil + } + case keys.SystemDatabaseID: + // Special case: looking up something in the system database. + // Those namespace table entries are cached. + id = sd.systemNamespace.lookup(parentSchemaID, name) + if id != descpb.InvalidID { + return id, err + } + // Make sure to cache the result if we had to look it up. + defer func() { + if err == nil && id != descpb.InvalidID { + sd.systemNamespace.add(descpb.NameInfo{ + ParentID: keys.SystemDatabaseID, + ParentSchemaID: parentSchemaID, + Name: name, + }, id) + } + }() + default: + if parentSchemaID == descpb.InvalidID { + // At this point we know that parentID is not zero, so a zero + // parentSchemaID means we're looking up a schema. + if maybeDB != nil { + // Special case: looking up a schema, but in a database which we already + // have the descriptor for. We find the schema ID in there. + id := maybeDB.GetSchemaID(name) + return id, nil + } + } + } + // Fall back to querying the namespace table. + return catkv.LookupID( + ctx, txn, sd.codec, parentID, parentSchemaID, name, + ) +} + +// getByName reads a descriptor from the storage layer by name. +// +// This is a three-step process: +// 1. resolve the descriptor's ID using the name information, +// 2. actually read the descriptor from storage, +// 3. check that the name in the descriptor is the one we expect; meaning that +// there is no RENAME underway for instance. +// +func (sd *storedDescriptors) getByName( + ctx context.Context, + version clusterversion.ClusterVersion, + txn *kv.Txn, + vd validate.ValidationDereferencer, + maybeDB catalog.DatabaseDescriptor, + parentID descpb.ID, + parentSchemaID descpb.ID, + name string, +) (catalog.Descriptor, error) { + descID, err := sd.lookupName(ctx, txn, maybeDB, parentID, parentSchemaID, name) + if err != nil || descID == descpb.InvalidID { + return nil, err + } + descs, err := sd.getByIDs(ctx, version, txn, vd, []descpb.ID{descID}) + if err != nil { + if errors.Is(err, catalog.ErrDescriptorNotFound) { + // Having done the namespace lookup, the descriptor must exist. + return nil, errors.WithAssertionFailure(err) + } + return nil, err + } + if descs[0].GetName() != name { + // Immediately after a RENAME an old name still points to the descriptor + // during the drain phase for the name. Do not return a descriptor during + // draining. + // + // TODO(postamar): remove this after 22.1 is release. + // At that point, draining names will no longer have to be supported. + // We can then consider making the descriptor collection aware of + // uncommitted namespace operations. + return nil, nil + } + return descs[0], nil +} + +// getByIDs actually reads a batch of descriptors from the storage layer. +func (sd *storedDescriptors) getByIDs( + ctx context.Context, + version clusterversion.ClusterVersion, + txn *kv.Txn, + vd validate.ValidationDereferencer, + ids []descpb.ID, +) ([]catalog.Descriptor, error) { + ret := make([]catalog.Descriptor, len(ids)) + kvIDs := make([]descpb.ID, 0, len(ids)) + indexes := make([]int, 0, len(ids)) + for i, id := range ids { + if id == keys.SystemDatabaseID { + // Special handling for the system database descriptor. + // + // This is done for performance reasons, to save ourselves an unnecessary + // round trip to storage which otherwise quickly compounds. + // + // The system database descriptor should never actually be mutated, which is + // why we return the same hard-coded descriptor every time. It's assumed + // that callers of this method will check the privileges on the descriptor + // (like any other database) and return an error. + ret[i] = dbdesc.NewBuilder(systemschema.SystemDB.DatabaseDesc()).BuildExistingMutable() + } else { + kvIDs = append(kvIDs, id) + indexes = append(indexes, i) + } + } + if len(kvIDs) == 0 { + return ret, nil + } + kvDescs, err := catkv.MustGetDescriptorsByID(ctx, version, sd.codec, txn, vd, kvIDs, catalog.Any) + if err != nil { + return nil, err + } + for j, desc := range kvDescs { + ret[indexes[j]] = desc + } + return ret, nil +} + +func (sd *storedDescriptors) iterateNewVersionByID( + fn func(originalVersion lease.IDVersion) error, +) error { + return sd.descs.IterateByID(func(entry catalog.NameEntry) error { + u := entry.(*storedDescriptor) + if u.storedDescriptorStatus == notValidatedYet { + return nil + } + mut := u.mutable + if mut == nil || mut.IsNew() || !mut.IsUncommittedVersion() { + return nil + } + return fn(lease.NewIDVersionPrev(mut.OriginalName(), mut.OriginalID(), mut.OriginalVersion())) + }) +} + +func (sd *storedDescriptors) iterateUncommittedByID(fn func(imm catalog.Descriptor) error) error { + return sd.descs.IterateByID(func(entry catalog.NameEntry) error { + u := entry.(*storedDescriptor) + if u.storedDescriptorStatus != checkedOutAtLeastOnce || !u.immutable.IsUncommittedVersion() { + return nil + } + return fn(u.immutable) + }) +} + +func (sd *storedDescriptors) getUncommittedTables() (tables []catalog.TableDescriptor) { + _ = sd.iterateUncommittedByID(func(desc catalog.Descriptor) error { + if table, ok := desc.(catalog.TableDescriptor); ok { + tables = append(tables, table) + } + return nil + }) + return tables +} + +func (sd *storedDescriptors) getUncommittedDescriptorsForValidation() (descs []catalog.Descriptor) { + _ = sd.iterateUncommittedByID(func(desc catalog.Descriptor) error { + descs = append(descs, desc) + return nil + }) + return descs +} + +func (sd *storedDescriptors) hasUncommittedTables() (has bool) { + _ = sd.iterateUncommittedByID(func(desc catalog.Descriptor) error { + if _, has = desc.(catalog.TableDescriptor); has { + return iterutil.StopIteration() + } + return nil + }) + return has +} + +func (sd *storedDescriptors) hasUncommittedTypes() (has bool) { + _ = sd.iterateUncommittedByID(func(desc catalog.Descriptor) error { + if _, has = desc.(catalog.TypeDescriptor); has { + return iterutil.StopIteration() + } + return nil + }) + return has +} + +var systemStoredDatabase = &storedDescriptor{ + immutable: dbdesc.NewBuilder(systemschema.SystemDB.DatabaseDesc()).BuildImmutableDatabase(), + // Note that the mutable field is left as nil. We'll generate a new + // value lazily when this is needed, which ought to be exceedingly rare. + mutable: nil, + storedDescriptorStatus: notCheckedOutYet, +} + +func (sd *storedDescriptors) maybeAddSystemDatabase() { + if !sd.addedSystemDatabase { + sd.addedSystemDatabase = true + sd.descs.Upsert(systemStoredDatabase) + } +} diff --git a/pkg/sql/catalog/descs/system_table.go b/pkg/sql/catalog/descs/system_table.go index 4dc423abeb7a..e27423b9b795 100644 --- a/pkg/sql/catalog/descs/system_table.go +++ b/pkg/sql/catalog/descs/system_table.go @@ -49,7 +49,7 @@ func (r *systemTableIDResolver) LookupSystemTableID( if err := r.collectionFactory.Txn(ctx, r.ie, r.db, func( ctx context.Context, txn *kv.Txn, descriptors *Collection, ) (err error) { - id, err = descriptors.kv.lookupName( + id, err = descriptors.stored.lookupName( ctx, txn, nil /* maybeDatabase */, keys.SystemDatabaseID, keys.PublicSchemaID, tableName, ) return err diff --git a/pkg/sql/catalog/descs/table.go b/pkg/sql/catalog/descs/table.go index 804f93e26d51..8865e2969447 100644 --- a/pkg/sql/catalog/descs/table.go +++ b/pkg/sql/catalog/descs/table.go @@ -81,10 +81,10 @@ func (tc *Collection) GetLeasedImmutableTableByID( // GetUncommittedMutableTableByID returns an uncommitted mutable table by its // ID. func (tc *Collection) GetUncommittedMutableTableByID(id descpb.ID) (*tabledesc.Mutable, error) { - if imm, status := tc.uncommitted.getImmutableByID(id); imm == nil || status == notValidatedYet { + if imm, status := tc.stored.getCachedByID(id); imm == nil || status == notValidatedYet { return nil, nil } - mut, err := tc.uncommitted.checkOut(id) + mut, err := tc.stored.checkOut(id) if err != nil { return nil, err } @@ -92,7 +92,7 @@ func (tc *Collection) GetUncommittedMutableTableByID(id descpb.ID) (*tabledesc.M return table, nil } // Check non-table descriptors back in. - return nil, tc.uncommitted.checkIn(mut) + return nil, tc.stored.checkIn(mut) } // GetMutableTableByID returns a mutable table descriptor with diff --git a/pkg/sql/catalog/descs/uncommitted_descriptors.go b/pkg/sql/catalog/descs/uncommitted_descriptors.go deleted file mode 100644 index 0f2b3b81d3d6..000000000000 --- a/pkg/sql/catalog/descs/uncommitted_descriptors.go +++ /dev/null @@ -1,390 +0,0 @@ -// Copyright 2021 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package descs - -import ( - "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/sql/catalog" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/dbdesc" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/nstree" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc" - "github.com/cockroachdb/cockroach/pkg/util/iterutil" - "github.com/cockroachdb/errors" -) - -// uncommittedDescriptorStatus is the status of an uncommitted descriptor. -type uncommittedDescriptorStatus int - -const ( - // notValidatedYet designates descriptors which have been read from - // storage but have not been validated yet. Until they're validated they - // cannot be used for anything else than validating other descriptors. - notValidatedYet uncommittedDescriptorStatus = iota - - // notCheckedOutYet designates descriptors which have been properly read from - // storage and have been validated but have never been checked out yet. This - // means that the mutable and immutable descriptor protos are known to be the - // exact same. - notCheckedOutYet - - // checkedOutAtLeastOnce designates descriptors which have been checked out at - // least once. Newly-created descriptors are considered to have been checked - // out as well. - checkedOutAtLeastOnce -) - -// uncommittedDescriptor is a descriptor that has been modified in the current -// transaction. -type uncommittedDescriptor struct { - - // immutable holds the descriptor as it was when this struct was initialized, - // either after being read from storage or after being checked in. - immutable catalog.Descriptor - - // mutable is initialized as a mutable copy of immutable when the descriptor - // is read from storage. - // This value might be nil in some rare cases where we completely bypass - // storage for performance reasons because the descriptor is guaranteed to - // never change. Such is the case of the system database descriptor for - // instance. - // This value should not make its way outside the uncommittedDescriptors - // other than via checkOut. - mutable catalog.MutableDescriptor - - // uncommittedDescriptorStatus describes the status of the mutable and - // immutable descriptors - uncommittedDescriptorStatus -} - -// GetName implements the catalog.NameEntry interface. -func (u *uncommittedDescriptor) GetName() string { - return u.immutable.GetName() -} - -// GetParentID implements the catalog.NameEntry interface. -func (u *uncommittedDescriptor) GetParentID() descpb.ID { - return u.immutable.GetParentID() -} - -// GetParentSchemaID implements the catalog.NameEntry interface. -func (u uncommittedDescriptor) GetParentSchemaID() descpb.ID { - return u.immutable.GetParentSchemaID() -} - -// GetID implements the catalog.NameEntry interface. -func (u uncommittedDescriptor) GetID() descpb.ID { - return u.immutable.GetID() -} - -// checkOut is how the mutable descriptor should be accessed. -func (u *uncommittedDescriptor) checkOut() catalog.MutableDescriptor { - if u.mutable == nil { - // This special case is allowed for certain system descriptors which - // for performance reasons are never actually read from storage, instead - // we use a copy of the descriptor hard-coded in the system schema used for - // bootstrapping a cluster. - // - // This implies that these descriptors never undergo any changes and - // therefore checking out a mutable descriptor is pointless for the most - // part. This may nonetheless legitimately happen during migrations - // which change all descriptors somehow, so we need to support this. - return u.immutable.NewBuilder().BuildExistingMutable() - } - u.uncommittedDescriptorStatus = checkedOutAtLeastOnce - return u.mutable -} - -var _ catalog.NameEntry = (*uncommittedDescriptor)(nil) - -// uncommittedDescriptors is the data structure holding all -// uncommittedDescriptor objects for a Collection. -// -// Immutable descriptors can be freely looked up. -// Mutable descriptors can be: -// 1. added into it, -// 2. checked out of it, -// 3. checked back in to it. -// -// An error will be triggered by: -// - checking out a mutable descriptor that hasn't yet been added, -// - checking in a descriptor that has been added but not yet checked out, -// - any checked-out-but-not-checked-in mutable descriptors at commit time. -// -type uncommittedDescriptors struct { - - // Descriptors modified by the uncommitted transaction affiliated with this - // Collection. This allows a transaction to see its own modifications while - // bypassing the descriptor lease mechanism. The lease mechanism will have its - // own transaction to read the descriptor and will hang waiting for the - // uncommitted changes to the descriptor. These descriptors are local to this - // Collection and invisible to other transactions. - descs nstree.Map - - // descNames is the set of names which a read or written - // descriptor took on at some point in its lifetime. Everything added to - // uncommittedDescriptors is added to descNames as well - // as all of the known draining names. The idea is that if we find that - // a name is not in the above map but is in the set, then we can avoid - // doing a lookup. - // - // TODO(postamar): better uncommitted namespace changes handling after 22.1. - descNames nstree.Set - - // addedSystemDatabase is used to mark whether the optimization to add the - // system database to the set of uncommitted descriptors has occurred. - addedSystemDatabase bool -} - -func (ud *uncommittedDescriptors) reset() { - ud.descs.Clear() - ud.descNames.Clear() - ud.addedSystemDatabase = false -} - -// add adds a descriptor to the set of uncommitted descriptors and returns -// an immutable copy of that descriptor. -func (ud *uncommittedDescriptors) add( - mut catalog.MutableDescriptor, status uncommittedDescriptorStatus, -) (catalog.Descriptor, error) { - uNew, err := makeUncommittedDescriptor(mut, status) - if err != nil { - return nil, err - } - if prev, ok := ud.descs.GetByID(mut.GetID()).(*uncommittedDescriptor); ok { - if prev.mutable.OriginalVersion() != mut.OriginalVersion() { - return nil, errors.AssertionFailedf( - "cannot add a version of descriptor with a different original version" + - " than it was previously added with") - } - } - ud.descs.Upsert(uNew) - return uNew.immutable, err -} - -// checkOut checks out an uncommitted mutable descriptor for use in the -// transaction. This descriptor should later be checked in again. -func (ud *uncommittedDescriptors) checkOut(id descpb.ID) (_ catalog.MutableDescriptor, err error) { - defer func() { - err = errors.NewAssertionErrorWithWrappedErrf( - err, "cannot check out uncommitted descriptor with ID %d", id, - ) - }() - if id == keys.SystemDatabaseID { - ud.maybeAddSystemDatabase() - } - entry := ud.descs.GetByID(id) - if entry == nil { - return nil, errors.New("descriptor hasn't been added yet") - } - u := entry.(*uncommittedDescriptor) - if u.uncommittedDescriptorStatus == notValidatedYet { - return nil, errors.New("descriptor hasn't been validated yet") - } - return u.checkOut(), nil -} - -// checkIn checks in an uncommitted mutable descriptor that was previously -// checked out. -func (ud *uncommittedDescriptors) checkIn(mut catalog.MutableDescriptor) error { - // TODO(postamar): actually check that the descriptor has been checked out. - _, err := ud.add(mut, checkedOutAtLeastOnce) - return err -} - -func makeUncommittedDescriptor( - desc catalog.MutableDescriptor, status uncommittedDescriptorStatus, -) (*uncommittedDescriptor, error) { - version := desc.GetVersion() - origVersion := desc.OriginalVersion() - if version != origVersion && version != origVersion+1 { - return nil, errors.AssertionFailedf( - "descriptor %d version %d not compatible with cluster version %d", - desc.GetID(), version, origVersion) - } - - mutable, err := maybeRefreshCachedFieldsOnTypeDescriptor(desc) - if err != nil { - return nil, err - } - - return &uncommittedDescriptor{ - mutable: mutable, - immutable: mutable.ImmutableCopy(), - uncommittedDescriptorStatus: status, - }, nil -} - -// maybeRefreshCachedFieldsOnTypeDescriptor refreshes the cached fields on a -// Mutable if the given descriptor is a type descriptor and works as a pass -// through for all other descriptors. Mutable type descriptors are refreshed to -// reconstruct enumMetadata. This ensures that tables hydration following a -// type descriptor update (in the same txn) happens using the modified fields. -func maybeRefreshCachedFieldsOnTypeDescriptor( - desc catalog.MutableDescriptor, -) (catalog.MutableDescriptor, error) { - typeDesc, ok := desc.(catalog.TypeDescriptor) - if ok { - return typedesc.UpdateCachedFieldsOnModifiedMutable(typeDesc) - } - return desc, nil -} - -// getImmutableByID looks up an uncommitted descriptor by ID. -func (ud *uncommittedDescriptors) getImmutableByID( - id descpb.ID, -) (catalog.Descriptor, uncommittedDescriptorStatus) { - if id == keys.SystemDatabaseID { - ud.maybeAddSystemDatabase() - } - entry := ud.descs.GetByID(id) - if entry == nil { - return nil, notValidatedYet - } - u := entry.(*uncommittedDescriptor) - return u.immutable, u.uncommittedDescriptorStatus -} - -// getByName returns a descriptor for the requested name if the requested name -// is for a descriptor modified within the transaction affiliated with the -// Collection. -// -// The first return value "hasKnownRename" is true when there is a known -// rename of that descriptor, so it would be invalid to miss the cache and go to -// KV (where the descriptor prior to the rename may still exist). -func (ud *uncommittedDescriptors) getByName( - dbID descpb.ID, schemaID descpb.ID, name string, -) (hasKnownRename bool, desc catalog.Descriptor) { - if dbID == 0 && schemaID == 0 && name == systemschema.SystemDatabaseName { - ud.maybeAddSystemDatabase() - } - // Walk latest to earliest so that a DROP followed by a CREATE with the same - // name will result in the CREATE being seen. - if got := ud.descs.GetByName(dbID, schemaID, name); got != nil { - u := got.(*uncommittedDescriptor) - if u.uncommittedDescriptorStatus == notValidatedYet { - return false, nil - } - return false, u.immutable - } - // Check whether the set is empty to avoid allocating the NameInfo. - if ud.descNames.Empty() { - return false, nil - } - return ud.descNames.Contains(descpb.NameInfo{ - ParentID: dbID, - ParentSchemaID: schemaID, - Name: name, - }), nil -} - -// getUnvalidatedByName looks up an unvalidated descriptor by name. -func (ud *uncommittedDescriptors) getUnvalidatedByName( - dbID descpb.ID, schemaID descpb.ID, name string, -) catalog.Descriptor { - if dbID == 0 && schemaID == 0 && name == systemschema.SystemDatabaseName { - ud.maybeAddSystemDatabase() - } - entry := ud.descs.GetByName(dbID, schemaID, name) - if entry == nil { - return nil - } - u := entry.(*uncommittedDescriptor) - if u.uncommittedDescriptorStatus != notValidatedYet { - return nil - } - return u.immutable -} - -func (ud *uncommittedDescriptors) iterateNewVersionByID( - fn func(originalVersion lease.IDVersion) error, -) error { - return ud.descs.IterateByID(func(entry catalog.NameEntry) error { - u := entry.(*uncommittedDescriptor) - if u.uncommittedDescriptorStatus == notValidatedYet { - return nil - } - mut := u.mutable - if mut == nil || mut.IsNew() || !mut.IsUncommittedVersion() { - return nil - } - return fn(lease.NewIDVersionPrev(mut.OriginalName(), mut.OriginalID(), mut.OriginalVersion())) - }) -} - -func (ud *uncommittedDescriptors) iterateUncommittedByID( - fn func(imm catalog.Descriptor) error, -) error { - return ud.descs.IterateByID(func(entry catalog.NameEntry) error { - u := entry.(*uncommittedDescriptor) - if u.uncommittedDescriptorStatus != checkedOutAtLeastOnce || !u.immutable.IsUncommittedVersion() { - return nil - } - return fn(u.immutable) - }) -} - -func (ud *uncommittedDescriptors) getUncommittedTables() (tables []catalog.TableDescriptor) { - _ = ud.iterateUncommittedByID(func(desc catalog.Descriptor) error { - if table, ok := desc.(catalog.TableDescriptor); ok { - tables = append(tables, table) - } - return nil - }) - return tables -} - -func (ud *uncommittedDescriptors) getUncommittedDescriptorsForValidation() ( - descs []catalog.Descriptor, -) { - _ = ud.iterateUncommittedByID(func(desc catalog.Descriptor) error { - descs = append(descs, desc) - return nil - }) - return descs -} - -func (ud *uncommittedDescriptors) hasUncommittedTables() (has bool) { - _ = ud.iterateUncommittedByID(func(desc catalog.Descriptor) error { - if _, has = desc.(catalog.TableDescriptor); has { - return iterutil.StopIteration() - } - return nil - }) - return has -} - -func (ud *uncommittedDescriptors) hasUncommittedTypes() (has bool) { - _ = ud.iterateUncommittedByID(func(desc catalog.Descriptor) error { - if _, has = desc.(catalog.TypeDescriptor); has { - return iterutil.StopIteration() - } - return nil - }) - return has -} - -var systemUncommittedDatabase = &uncommittedDescriptor{ - immutable: dbdesc.NewBuilder(systemschema.SystemDB.DatabaseDesc()).BuildImmutableDatabase(), - // Note that the mutable field is left as nil. We'll generate a new - // value lazily when this is needed, which ought to be exceedingly rare. - mutable: nil, - uncommittedDescriptorStatus: notCheckedOutYet, -} - -func (ud *uncommittedDescriptors) maybeAddSystemDatabase() { - if !ud.addedSystemDatabase { - ud.addedSystemDatabase = true - ud.descs.Upsert(systemUncommittedDatabase) - } -} diff --git a/pkg/sql/catalog/descs/validate.go b/pkg/sql/catalog/descs/validate.go index 2ba582cd5293..de9167af0885 100644 --- a/pkg/sql/catalog/descs/validate.go +++ b/pkg/sql/catalog/descs/validate.go @@ -54,7 +54,7 @@ func (tc *Collection) ValidateUncommittedDescriptors(ctx context.Context, txn *k if tc.skipValidationOnWrite || !ValidateOnWriteEnabled.Get(&tc.settings.SV) { return nil } - descs := tc.uncommitted.getUncommittedDescriptorsForValidation() + descs := tc.stored.getUncommittedDescriptorsForValidation() if len(descs) == 0 { return nil } @@ -109,8 +109,8 @@ func (c collectionBackedDereferencer) DereferenceDescriptors( if desc == nil { continue } - if uc, _ := c.tc.uncommitted.getImmutableByID(desc.GetID()); uc == nil { - desc, err = c.tc.uncommitted.add(desc.NewBuilder().BuildExistingMutable(), notValidatedYet) + if uc, _ := c.tc.stored.getCachedByID(desc.GetID()); uc == nil { + desc, err = c.tc.stored.add(ctx, desc.NewBuilder().BuildExistingMutable(), notValidatedYet) if err != nil { return nil, err } @@ -124,7 +124,7 @@ func (c collectionBackedDereferencer) DereferenceDescriptors( func (c collectionBackedDereferencer) fastDescLookup( ctx context.Context, id descpb.ID, ) (catalog.Descriptor, error) { - if uc, _ := c.tc.uncommitted.getImmutableByID(id); uc != nil { + if uc, _ := c.tc.stored.getCachedByID(id); uc != nil { return uc, nil } return nil, nil @@ -178,7 +178,7 @@ func (c collectionBackedDereferencer) fastNamespaceLookup( } case keys.SystemDatabaseID: // Looking up system database objects, which are cached. - id = c.tc.kv.systemNamespace.lookup(req.ParentSchemaID, req.Name) + id = c.tc.stored.systemNamespace.lookup(req.ParentSchemaID, req.Name) return id != descpb.InvalidID, id, nil } return false, descpb.InvalidID, nil diff --git a/pkg/sql/catalog/nstree/by_name_map.go b/pkg/sql/catalog/nstree/by_name_map.go index 2786834a77a2..4a92c816c43b 100644 --- a/pkg/sql/catalog/nstree/by_name_map.go +++ b/pkg/sql/catalog/nstree/by_name_map.go @@ -48,3 +48,27 @@ func (t byNameMap) ascend(f EntryIterator) error { return f(k.(catalog.NameEntry)) }) } + +func (t byNameMap) ascendDatabases(f EntryIterator) error { + min, max := byNameItem{}.get(), byNameItem{parentSchemaID: 1}.get() + defer min.put() + defer max.put() + return ascendRange(t.t, min, max, func(k interface{}) error { + return f(k.(catalog.NameEntry)) + }) +} + +func (t byNameMap) ascendSchemasForDatabase(dbID descpb.ID, f EntryIterator) error { + min := byNameItem{ + parentID: dbID, + }.get() + max := byNameItem{ + parentID: dbID, + parentSchemaID: 1, + }.get() + defer min.put() + defer max.put() + return ascendRange(t.t, min, max, func(k interface{}) error { + return f(k.(catalog.NameEntry)) + }) +} diff --git a/pkg/sql/catalog/nstree/map.go b/pkg/sql/catalog/nstree/map.go index e1bd0737dbf7..d6d8eb357f8f 100644 --- a/pkg/sql/catalog/nstree/map.go +++ b/pkg/sql/catalog/nstree/map.go @@ -99,6 +99,23 @@ func (dt *Map) IterateByName(f EntryIterator) error { return dt.byName.ascend(f) } +// IterateDatabasesByName iterates the database descriptors by name, ascending. +func (dt *Map) IterateDatabasesByName(f EntryIterator) error { + if !dt.initialized() { + return nil + } + return dt.byName.ascendDatabases(f) +} + +// IterateSchemasForDatabaseByName iterates the schema descriptors for the +// database by name, ascending. +func (dt *Map) IterateSchemasForDatabaseByName(dbID descpb.ID, f EntryIterator) error { + if !dt.initialized() { + return nil + } + return dt.byName.ascendSchemasForDatabase(dbID, f) +} + // Len returns the number of descriptors in the tree. func (dt *Map) Len() int { if !dt.initialized() { diff --git a/pkg/sql/catalog/nstree/tree.go b/pkg/sql/catalog/nstree/tree.go index 702425ab7478..d33f38912868 100644 --- a/pkg/sql/catalog/nstree/tree.go +++ b/pkg/sql/catalog/nstree/tree.go @@ -73,3 +73,13 @@ func ascend(t *btree.BTree, f func(k interface{}) error) (err error) { }) return iterutil.Map(err) } + +func ascendRange( + t *btree.BTree, greaterOrEqual, lessThan btree.Item, f func(k interface{}) error, +) (err error) { + t.AscendRange(greaterOrEqual, lessThan, func(i btree.Item) bool { + err = f(i.(item).value()) + return err == nil + }) + return iterutil.Map(err) +} diff --git a/pkg/sql/descriptor.go b/pkg/sql/descriptor.go index 318e7f61283a..07e3e5f76200 100644 --- a/pkg/sql/descriptor.go +++ b/pkg/sql/descriptor.go @@ -275,7 +275,7 @@ func (p *planner) createDescriptorWithID( log.Fatalf(ctx, "unexpected type %T when creating descriptor", mutDesc) } if addUncommitted { - if err := p.Descriptors().AddUncommittedDescriptor(mutDesc); err != nil { + if err := p.Descriptors().AddUncommittedDescriptor(ctx, mutDesc); err != nil { return err } } diff --git a/pkg/sql/importer/import_job.go b/pkg/sql/importer/import_job.go index 9b56c4a67ec6..912bfa730ade 100644 --- a/pkg/sql/importer/import_job.go +++ b/pkg/sql/importer/import_job.go @@ -716,7 +716,7 @@ func createSchemaDescriptorWithID( } switch mutDesc.(type) { case *schemadesc.Mutable: - if err := descsCol.AddUncommittedDescriptor(mutDesc); err != nil { + if err := descsCol.AddUncommittedDescriptor(ctx, mutDesc); err != nil { return err } default: diff --git a/pkg/sql/repair.go b/pkg/sql/repair.go index ed75a3d67e5e..63cce9842c18 100644 --- a/pkg/sql/repair.go +++ b/pkg/sql/repair.go @@ -661,7 +661,7 @@ func (p *planner) UnsafeDeleteDescriptor(ctx context.Context, descID int64, forc if mut != nil { mut.MaybeIncrementVersion() mut.SetDropped() - if err := p.Descriptors().AddUncommittedDescriptor(mut); err != nil { + if err := p.Descriptors().AddUncommittedDescriptor(ctx, mut); err != nil { return errors.WithAssertionFailure(err) } if force {