Skip to content

Commit

Permalink
descs: unify uncommitted and kv descriptors
Browse files Browse the repository at this point in the history
Relates to cockroachdb#64673.

Previously, the collection maintained two separate sources:
`uncommittedDescriptors` and `kvDescriptors`. This was confusing because
`uncommittedDescriptors` was intended to contain modified descriptors,
but it also held descriptors cached from KV point lookups. This commit
fixes this by introducing `storedDescriptors`, which is the combination
of the above sources.

This commit works towards a model where `storedDescriptors` is a mirror
of the descriptors in KV. These descriptors were either cached after KV
reads, or were modified by the associated transaction and should be
written to KV upon commit. Now, all descriptors read from storage are
stored in the same btree, eliminating possible duplication between
`kvDescriptors` and `uncommittedDescriptors`.

As a byproduct of this change, we are able to better leverage caching
due to virtual table lookups within a transaction. First, we no longer
need to invalidate batches of cached descriptors after schema changes.
Second, point lookups after a range lookup will properly check the
descriptors cached due to the range lookup.

Release note: None
  • Loading branch information
Jason Chan committed Jul 19, 2022
1 parent 7d021d3 commit 0c2a6f7
Show file tree
Hide file tree
Showing 19 changed files with 995 additions and 819 deletions.
10 changes: 5 additions & 5 deletions pkg/bench/rttanalysis/testdata/benchmark_expectations
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ exp,benchmark
17,DropDatabase/drop_database_1_table
18,DropDatabase/drop_database_2_tables
19,DropDatabase/drop_database_3_tables
20,DropRole/drop_1_role
27,DropRole/drop_2_roles
34,DropRole/drop_3_roles
18,DropRole/drop_1_role
25,DropRole/drop_2_roles
32,DropRole/drop_3_roles
17,DropSequence/drop_1_sequence
19,DropSequence/drop_2_sequences
21,DropSequence/drop_3_sequences
Expand Down Expand Up @@ -92,5 +92,5 @@ exp,benchmark
18,Truncate/truncate_2_column_2_rows
1,VirtualTableQueries/select_crdb_internal.invalid_objects_with_1_fk
1,VirtualTableQueries/select_crdb_internal.tables_with_1_fk
6,VirtualTableQueries/virtual_table_cache_with_point_lookups
21,VirtualTableQueries/virtual_table_cache_with_schema_change
4,VirtualTableQueries/virtual_table_cache_with_point_lookups
12,VirtualTableQueries/virtual_table_cache_with_schema_change
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -2188,7 +2188,7 @@ func (r *restoreResumer) dropDescriptors(
// descriptor validation.
mutSchema.SetDropped()
mutSchema.MaybeIncrementVersion()
if err := descsCol.AddUncommittedDescriptor(mutSchema); err != nil {
if err := descsCol.AddUncommittedDescriptor(ctx, mutSchema); err != nil {
return err
}

Expand Down Expand Up @@ -2258,7 +2258,7 @@ func (r *restoreResumer) dropDescriptors(
// descriptor validation.
db.SetDropped()
db.MaybeIncrementVersion()
if err := descsCol.AddUncommittedDescriptor(db); err != nil {
if err := descsCol.AddUncommittedDescriptor(ctx, db); err != nil {
return err
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/sql/catalog/descs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ go_library(
"dist_sql_type_resolver.go",
"factory.go",
"hydrate.go",
"kv_descriptors.go",
"leased_descriptors.go",
"object.go",
"schema.go",
"stored_descriptors.go",
"synthetic_descriptors.go",
"system_database_namespace_cache.go",
"system_table.go",
Expand All @@ -23,7 +23,6 @@ go_library(
"temporary_descriptors.go",
"txn.go",
"type.go",
"uncommitted_descriptors.go",
"validate.go",
"virtual_descriptors.go",
],
Expand Down
57 changes: 28 additions & 29 deletions pkg/sql/catalog/descs/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func makeCollection(
hydratedTables: hydratedTables,
virtual: makeVirtualDescriptors(virtualSchemas),
leased: makeLeasedDescriptors(leaseMgr),
kv: makeKVDescriptors(codec, systemNamespace, monitor),
stored: makeStoredDescriptors(codec, systemNamespace, monitor),
temporary: makeTemporaryDescriptors(settings, codec, temporarySchemaProvider),
direct: makeDirect(ctx, codec, settings),
}
Expand All @@ -83,17 +83,17 @@ type Collection struct {
// the transaction using them is complete.
leased leasedDescriptors

// Descriptors modified by the uncommitted transaction affiliated with this
// Collection. This allows a transaction to see its own modifications while
// bypassing the descriptor lease mechanism. The lease mechanism will have its
// own transaction to read the descriptor and will hang waiting for the
// uncommitted changes to the descriptor if this transaction is PRIORITY HIGH.
// These descriptors are local to this Collection and their state is thus not
// visible to other transactions.
uncommitted uncommittedDescriptors

// A collection of descriptors which were read from the store.
kv kvDescriptors
// A mirror of the descriptors in storage. These descriptors are either (1)
// already stored and were read from KV, or (2) have been modified by the
// uncommitted transaction affiliated with this Collection and should be
// written to KV upon commit.
// Source (1) serves as a cache. Source (2) allows a transaction to see its
// own modifications while bypassing the descriptor lease mechanism. The
// lease mechanism will have its own transaction to read the descriptor and
// will hang waiting for the uncommitted changes to the descriptor if this
// transaction is PRIORITY HIGH. These descriptors are local to this
// Collection and their state is thus not visible to other transactions.
stored storedDescriptors

// syntheticDescriptors contains in-memory descriptors which override all
// other matching descriptors during immutable descriptor resolution (by name
Expand Down Expand Up @@ -154,7 +154,7 @@ func (tc *Collection) ResetMaxTimestampBound() {
tc.maxTimestampBoundDeadlineHolder.maxTimestampBound = hlc.Timestamp{}
}

// SkipValidationOnWrite avoids validating uncommitted descriptors prior to
// SkipValidationOnWrite avoids validating stored descriptors prior to
// a transaction commit.
func (tc *Collection) SkipValidationOnWrite() {
tc.skipValidationOnWrite = true
Expand All @@ -177,8 +177,7 @@ func (tc *Collection) ReleaseLeases(ctx context.Context) {
// ReleaseAll calls ReleaseLeases.
func (tc *Collection) ReleaseAll(ctx context.Context) {
tc.ReleaseLeases(ctx)
tc.uncommitted.reset()
tc.kv.reset(ctx)
tc.stored.reset(ctx)
tc.synthetic.reset()
tc.deletedDescs = catalog.DescriptorIDSet{}
tc.skipValidationOnWrite = false
Expand All @@ -187,13 +186,13 @@ func (tc *Collection) ReleaseAll(ctx context.Context) {
// HasUncommittedTables returns true if the Collection contains uncommitted
// tables.
func (tc *Collection) HasUncommittedTables() bool {
return tc.uncommitted.hasUncommittedTables()
return tc.stored.hasUncommittedTables()
}

// HasUncommittedTypes returns true if the Collection contains uncommitted
// types.
func (tc *Collection) HasUncommittedTypes() bool {
return tc.uncommitted.hasUncommittedTypes()
return tc.stored.hasUncommittedTypes()
}

// AddUncommittedDescriptor adds an uncommitted descriptor modified in the
Expand All @@ -205,11 +204,11 @@ func (tc *Collection) HasUncommittedTypes() bool {
// will return this exact object. Subsequent attempts to resolve this descriptor
// immutably will return a copy of the descriptor in the current state. A deep
// copy is performed in this call.
func (tc *Collection) AddUncommittedDescriptor(desc catalog.MutableDescriptor) error {
// Invalidate all the cached descriptors since a stale copy of this may be
// included.
tc.kv.releaseAllDescriptors()
return tc.uncommitted.checkIn(desc)
func (tc *Collection) AddUncommittedDescriptor(
ctx context.Context, desc catalog.MutableDescriptor,
) error {
_, err := tc.stored.add(ctx, desc, checkedOutAtLeastOnce)
return err
}

// ValidateOnWriteEnabled is the cluster setting used to enable or disable
Expand All @@ -232,7 +231,7 @@ func (tc *Collection) WriteDescToBatch(
return err
}
}
if err := tc.AddUncommittedDescriptor(desc); err != nil {
if err := tc.AddUncommittedDescriptor(ctx, desc); err != nil {
return err
}
descKey := catalogkeys.MakeDescMetadataKey(tc.codec(), desc.GetID())
Expand Down Expand Up @@ -260,7 +259,7 @@ func (tc *Collection) WriteDesc(
// returned for each schema change is ClusterVersion - 1, because that's the one
// that will be used when checking for table descriptor two version invariance.
func (tc *Collection) GetDescriptorsWithNewVersion() (originalVersions []lease.IDVersion) {
_ = tc.uncommitted.iterateNewVersionByID(func(originalVersion lease.IDVersion) error {
_ = tc.stored.iterateNewVersionByID(func(originalVersion lease.IDVersion) error {
originalVersions = append(originalVersions, originalVersion)
return nil
})
Expand All @@ -270,7 +269,7 @@ func (tc *Collection) GetDescriptorsWithNewVersion() (originalVersions []lease.I
// GetUncommittedTables returns all the tables updated or created in the
// transaction.
func (tc *Collection) GetUncommittedTables() (tables []catalog.TableDescriptor) {
return tc.uncommitted.getUncommittedTables()
return tc.stored.getUncommittedTables()
}

func newMutableSyntheticDescriptorAssertionError(id descpb.ID) error {
Expand All @@ -281,7 +280,7 @@ func newMutableSyntheticDescriptorAssertionError(id descpb.ID) error {
// first checking the Collection's cached descriptors for validity if validate
// is set to true before defaulting to a key-value scan, if necessary.
func (tc *Collection) GetAllDescriptors(ctx context.Context, txn *kv.Txn) (nstree.Catalog, error) {
return tc.kv.getAllDescriptors(ctx, txn, tc.version)
return tc.stored.getAllDescriptors(ctx, txn, tc.version)
}

// GetAllDatabaseDescriptors returns all database descriptors visible by the
Expand All @@ -294,7 +293,7 @@ func (tc *Collection) GetAllDatabaseDescriptors(
ctx context.Context, txn *kv.Txn,
) ([]catalog.DatabaseDescriptor, error) {
vd := tc.newValidationDereferencer(txn)
return tc.kv.getAllDatabaseDescriptors(ctx, tc.version, txn, vd)
return tc.stored.getAllDatabaseDescriptors(ctx, tc.version, txn, vd)
}

// GetAllTableDescriptorsInDatabase returns all the table descriptors visible to
Expand Down Expand Up @@ -335,7 +334,7 @@ func (tc *Collection) GetAllTableDescriptorsInDatabase(
func (tc *Collection) GetSchemasForDatabase(
ctx context.Context, txn *kv.Txn, dbDesc catalog.DatabaseDescriptor,
) (map[descpb.ID]string, error) {
return tc.kv.getSchemasForDatabase(ctx, txn, dbDesc)
return tc.stored.getSchemasForDatabase(ctx, txn, dbDesc)
}

// GetObjectNamesAndIDs returns the names and IDs of all objects in a database and schema.
Expand Down Expand Up @@ -416,7 +415,7 @@ func (tc *Collection) RemoveSyntheticDescriptor(id descpb.ID) {
}

func (tc *Collection) codec() keys.SQLCodec {
return tc.kv.codec
return tc.stored.codec
}

// AddDeletedDescriptor is temporarily tracking descriptors that have been,
Expand Down
126 changes: 123 additions & 3 deletions pkg/sql/catalog/descs/collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func TestAddUncommittedDescriptorAndMutableResolution(t *testing.T) {
require.NoError(t, err)
require.Same(t, immByID, immResolvedWithNewNameButHasOldName)

require.NoError(t, descriptors.AddUncommittedDescriptor(mut))
require.NoError(t, descriptors.AddUncommittedDescriptor(ctx, mut))

immByNameAfter, err := descriptors.GetImmutableDatabaseByName(ctx, txn, "new_name", flags)
require.NoError(t, err)
Expand Down Expand Up @@ -598,7 +598,7 @@ func TestCollectionPreservesPostDeserializationChanges(t *testing.T) {

// TestCollectionProperlyUsesMemoryMonitoring ensures that memory monitoring
// on Collection is working properly.
// Namely, we are currently only tracking memory usage on Collection.kvDescriptors
// Namely, we are currently only tracking memory usage on Collection.storedDescriptors
// since it reads all descriptors from storage, which can be huge.
//
// The testing strategy is to
Expand All @@ -610,7 +610,7 @@ func TestCollectionPreservesPostDeserializationChanges(t *testing.T) {
// 3. Change the monitor budget to something small. Repeat step 2 and expect an error
// being thrown out when reading all those descriptors into memory to validate the
// memory monitor indeed kicked in and had an effect.
func TestKVDescriptorsProperlyUsesMemoryMonitoring(t *testing.T) {
func TestCollectionProperlyUsesMemoryMonitoring(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

Expand Down Expand Up @@ -663,3 +663,123 @@ func TestKVDescriptorsProperlyUsesMemoryMonitoring(t *testing.T) {
require.Equal(t, int64(0), monitor.AllocBytes())
monitor.Stop(ctx)
}

// TestDescriptorCache ensures that when descriptors are modified, a batch
// lookup on the Collection views the latest changes.
func TestDescriptorCache(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)

tdb := sqlutils.MakeSQLRunner(tc.ServerConn(0))
tdb.Exec(t, `CREATE DATABASE db`)
tdb.Exec(t, `USE db`)
tdb.Exec(t, `CREATE SCHEMA schema`)
tdb.Exec(t, `CREATE TABLE db.schema.table()`)

s0 := tc.Server(0)
execCfg := s0.ExecutorConfig().(sql.ExecutorConfig)
t.Run("all descriptors", func(t *testing.T) {
require.NoError(t, sql.DescsTxn(ctx, &execCfg, func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
// Warm up cache.
_, err := descriptors.GetAllDescriptors(ctx, txn)
if err != nil {
return err
}
// Modify table descriptor.
tn := tree.MakeTableNameWithSchema("db", "schema", "table")
flags := tree.ObjectLookupFlagsWithRequired()
flags.RequireMutable = true
_, mut, err := descriptors.GetMutableTableByName(ctx, txn, &tn, flags)
if err != nil {
return err
}
require.NotNil(t, mut)
mut.Name = "new_name"
err = descriptors.AddUncommittedDescriptor(ctx, mut)
if err != nil {
return err
}
// The collection's all descriptors should include the modification.
cat, err := descriptors.GetAllDescriptors(ctx, txn)
if err != nil {
return err
}
found := cat.LookupDescriptorEntry(mut.ID)
require.NotEmpty(t, found)
require.Equal(t, found, mut.ImmutableCopy())
return nil
}))
})
t.Run("all db descriptors", func(t *testing.T) {
require.NoError(t, sql.DescsTxn(ctx, &execCfg, func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
// Warm up cache.
_, err := descriptors.GetAllDatabaseDescriptors(ctx, txn)
if err != nil {
return err
}
// Modify database descriptor.
flags := tree.DatabaseLookupFlags{}
flags.RequireMutable = true
mut, err := descriptors.GetMutableDatabaseByName(ctx, txn, "db", flags)
if err != nil {
return err
}
require.NotNil(t, mut)
mut.Version += 1
err = descriptors.AddUncommittedDescriptor(ctx, mut)
if err != nil {
return err
}
// The collection's all database descriptors should reflect the
// modification.
dbDescs, err := descriptors.GetAllDatabaseDescriptors(ctx, txn)
if err != nil {
return err
}
require.Len(t, dbDescs, 4)
require.Equal(t, mut.ImmutableCopy(), dbDescs[0])
return nil
}))
})
t.Run("schemas for database", func(t *testing.T) {
require.NoError(t, sql.DescsTxn(ctx, &execCfg, func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
// Warm up cache.
dbDesc, err := descriptors.GetDatabaseDesc(ctx, txn, "db", tree.DatabaseLookupFlags{})
if err != nil {
return err
}
_, err = descriptors.GetSchemasForDatabase(ctx, txn, dbDesc)
if err != nil {
return err
}
// Modify schema name.
schemaDesc, err := descriptors.GetMutableSchemaByName(ctx, txn, dbDesc, "schema", tree.SchemaLookupFlags{Required: true})
if err != nil {
return err
}
schemaDesc.SchemaDesc().Name = "new_name"
err = descriptors.AddUncommittedDescriptor(ctx, schemaDesc.(catalog.MutableDescriptor))
if err != nil {
return err
}
// The collection's schemas for database should reflect the modification.
schemas, err := descriptors.GetSchemasForDatabase(ctx, txn, dbDesc)
if err != nil {
return err
}
require.Len(t, schemas, 2)
require.Equal(t, schemaDesc.GetName(), schemas[schemaDesc.GetID()])
return nil
}))
})
}
Loading

0 comments on commit 0c2a6f7

Please sign in to comment.