Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

descs: don't invalidate kvDescriptors cache #84187

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/bench/rttanalysis/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"//pkg/base",
"//pkg/kv/kvclient/kvcoord",
"//pkg/sql",
"//pkg/sql/parser",
"//pkg/testutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
Expand Down
50 changes: 35 additions & 15 deletions pkg/bench/rttanalysis/rtt_analysis_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"testing"

"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -117,32 +118,51 @@ func executeRoundTripTest(b testingB, tc RoundTripBenchTestCase, cc ClusterConst
b.StopTimer()
var r tracingpb.Recording

// The statement trace records individual statements, but we may want to
// execute multiple SQL statements. Note that multi-statement traces won't
// count round trips correctly if there are duplicate statements.
statements, err := parser.Parse(tc.Stmt)
if err != nil {
panic(err)
}

// Do an extra iteration and don't record it in order to deal with effects of
// running it the first time.
for i := 0; i < b.N()+1; i++ {
sql.Exec(b, "CREATE DATABASE bench;")
sql.Exec(b, tc.Setup)
cluster.clearStatementTrace(tc.Stmt)
for _, statement := range statements {
cluster.clearStatementTrace(statement.SQL)
}

b.StartTimer()
sql.Exec(b, tc.Stmt)
b.StopTimer()
var ok bool
r, ok = cluster.getStatementTrace(tc.Stmt)
if !ok {
b.Fatalf(
"could not find number of round trips for statement: %s",
tc.Stmt,
)
}

// If there's a retry error then we're just going to throw away this
// run.
rt, hasRetry := countKvBatchRequestsInRecording(r)
if hasRetry {
i--
} else if i > 0 { // skip the initial iteration
roundTrips += rt
total := 0
for _, statement := range statements {
r, ok = cluster.getStatementTrace(statement.SQL)
if !ok {
b.Fatalf(
"could not find number of round trips for statement: %s",
statement.SQL,
)
}

// If there's a retry error then we're just going to throw away this
// run.
rt, hasRetry := countKvBatchRequestsInRecording(r)
if hasRetry {
i--
ok = false
break
} else if i > 0 { // skip the initial iteration
total += rt
}
}
if ok {
roundTrips += total
}

sql.Exec(b, "DROP DATABASE bench;")
Expand Down
1 change: 1 addition & 0 deletions pkg/bench/rttanalysis/testdata/benchmark_expectations
Original file line number Diff line number Diff line change
Expand Up @@ -90,5 +90,6 @@ exp,benchmark
18,Truncate/truncate_2_column_0_rows
18,Truncate/truncate_2_column_1_rows
18,Truncate/truncate_2_column_2_rows
19,VirtualTableQueries/virtual_table_cache
1,VirtualTableQueries/select_crdb_internal.invalid_objects_with_1_fk
1,VirtualTableQueries/select_crdb_internal.tables_with_1_fk
15 changes: 15 additions & 0 deletions pkg/bench/rttanalysis/virtual_table_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,20 @@ CREATE TABLE t2 (i INT PRIMARY KEY, j INT REFERENCES t1(i));
`,
Stmt: `SELECT * FROM "".crdb_internal.invalid_objects`,
},
// This checks that descriptors are still cached after they are written. We
// expect the second and third selects not to go to KV because the
// descriptors were cached after the first lookup.
{
Name: "virtual table cache",
Setup: `
CREATE TABLE t1 (i INT PRIMARY KEY);
CREATE TABLE t2 (i INT PRIMARY KEY, j INT);`,
Stmt: `
SELECT * FROM crdb_internal.tables;
ALTER TABLE t1 ADD COLUMN j INT;
SELECT * FROM crdb_internal.table_columns;
CREATE INDEX idx ON t2 (j);
SELECT * FROM crdb_internal.index_columns;`,
},
})
}
63 changes: 57 additions & 6 deletions pkg/sql/catalog/descs/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,6 @@ func (tc *Collection) HasUncommittedTypes() bool {
// immutably will return a copy of the descriptor in the current state. A deep
// copy is performed in this call.
func (tc *Collection) AddUncommittedDescriptor(desc catalog.MutableDescriptor) error {
// Invalidate all the cached descriptors since a stale copy of this may be
// included.
tc.kv.releaseAllDescriptors()
return tc.uncommitted.checkIn(desc)
}

Expand Down Expand Up @@ -281,7 +278,22 @@ func newMutableSyntheticDescriptorAssertionError(id descpb.ID) error {
// first checking the Collection's cached descriptors for validity if validate
// is set to true before defaulting to a key-value scan, if necessary.
func (tc *Collection) GetAllDescriptors(ctx context.Context, txn *kv.Txn) (nstree.Catalog, error) {
return tc.kv.getAllDescriptors(ctx, txn, tc.version)
cat, err := tc.kv.getAllDescriptors(ctx, txn, tc.version)
if err != nil {
return nstree.Catalog{}, err
}
mutCat := nstree.MutableCatalog{Catalog: cat}
err = tc.uncommitted.iterateMutableDescriptors(func(desc catalog.Descriptor) error {
if tableDesc, isTableDesc := desc.(catalog.TableDescriptor); isTableDesc {
desc, err = tc.hydrateTypesInTableDesc(ctx, txn, tableDesc)
if err != nil {
return err
}
}
mutCat.UpsertDescriptorEntry(desc)
return nil
})
return mutCat.Catalog, err
}

// GetAllDatabaseDescriptors returns all database descriptors visible by the
Expand All @@ -294,7 +306,25 @@ func (tc *Collection) GetAllDatabaseDescriptors(
ctx context.Context, txn *kv.Txn,
) ([]catalog.DatabaseDescriptor, error) {
vd := tc.newValidationDereferencer(txn)
return tc.kv.getAllDatabaseDescriptors(ctx, tc.version, txn, vd)
dbDescs, err := tc.kv.getAllDatabaseDescriptors(ctx, tc.version, txn, vd)
if err != nil {
return nil, err
}
err = tc.uncommitted.iterateMutableDescriptors(func(desc catalog.Descriptor) error {
d, ok := desc.(catalog.DatabaseDescriptor)
if !ok {
return nil
}
for i, dbDesc := range dbDescs {
if dbDesc.GetID() == d.GetID() {
dbDescs[i] = d
return nil
}
}
dbDescs = append(dbDescs, d)
return nil
})
return dbDescs, err
}

// GetAllTableDescriptorsInDatabase returns all the table descriptors visible to
Expand Down Expand Up @@ -335,7 +365,21 @@ func (tc *Collection) GetAllTableDescriptorsInDatabase(
func (tc *Collection) GetSchemasForDatabase(
ctx context.Context, txn *kv.Txn, dbDesc catalog.DatabaseDescriptor,
) (map[descpb.ID]string, error) {
return tc.kv.getSchemasForDatabase(ctx, txn, dbDesc)
schemas, err := tc.kv.getSchemasForDatabase(ctx, txn, dbDesc)
if err != nil {
return nil, err
}
err = tc.uncommitted.iterateMutableDescriptors(func(desc catalog.Descriptor) error {
d, ok := desc.(catalog.SchemaDescriptor)
if !ok {
return nil
}
if d.GetParentID() == dbDesc.GetID() {
schemas[d.GetID()] = d.GetName()
}
return nil
})
return schemas, err
}

// GetObjectNamesAndIDs returns the names and IDs of all objects in a database and schema.
Expand Down Expand Up @@ -435,3 +479,10 @@ func (tc *Collection) AddDeletedDescriptor(id descpb.ID) {
func (tc *Collection) SetSession(session sqlliveness.Session) {
tc.sqlLivenessSession = session
}

func (tc *Collection) idDefinitelyDoesNotExist(id descpb.ID) bool {
if tc.kv.allDescriptors.isUnset() {
return false
}
return !tc.kv.allDescriptors.contains(id) && tc.uncommitted.descs.GetByID(id) == nil
}
120 changes: 120 additions & 0 deletions pkg/sql/catalog/descs/collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,3 +664,123 @@ func TestKVDescriptorsProperlyUsesMemoryMonitoring(t *testing.T) {
require.Equal(t, int64(0), monitor.AllocBytes())
monitor.Stop(ctx)
}

// TestDescriptorCache ensures that when descriptors are modified, a batch
// lookup on the Collection views the latest changes.
func TestDescriptorCache(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)

tdb := sqlutils.MakeSQLRunner(tc.ServerConn(0))
tdb.Exec(t, `CREATE DATABASE db`)
tdb.Exec(t, `USE db`)
tdb.Exec(t, `CREATE SCHEMA schema`)
tdb.Exec(t, `CREATE TABLE db.schema.table()`)

s0 := tc.Server(0)
execCfg := s0.ExecutorConfig().(sql.ExecutorConfig)
t.Run("all descriptors", func(t *testing.T) {
require.NoError(t, sql.DescsTxn(ctx, &execCfg, func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
// Warm up cache.
_, err := descriptors.GetAllDescriptors(ctx, txn)
if err != nil {
return err
}
// Modify table descriptor.
tn := tree.MakeTableNameWithSchema("db", "schema", "table")
flags := tree.ObjectLookupFlagsWithRequired()
flags.RequireMutable = true
_, mut, err := descriptors.GetMutableTableByName(ctx, txn, &tn, flags)
if err != nil {
return err
}
require.NotNil(t, mut)
mut.Name = "new_name"
err = descriptors.AddUncommittedDescriptor(mut)
if err != nil {
return err
}
// The collection's all descriptors should include the modification.
cat, err := descriptors.GetAllDescriptors(ctx, txn)
if err != nil {
return err
}
found := cat.LookupDescriptorEntry(mut.ID)
require.NotEmpty(t, found)
require.Equal(t, found, mut.ImmutableCopy())
return nil
}))
})
t.Run("all db descriptors", func(t *testing.T) {
require.NoError(t, sql.DescsTxn(ctx, &execCfg, func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
// Warm up cache.
_, err := descriptors.GetAllDatabaseDescriptors(ctx, txn)
if err != nil {
return err
}
// Modify database descriptor.
flags := tree.DatabaseLookupFlags{}
flags.RequireMutable = true
mut, err := descriptors.GetMutableDatabaseByName(ctx, txn, "db", flags)
if err != nil {
return err
}
require.NotNil(t, mut)
mut.Version += 1
err = descriptors.AddUncommittedDescriptor(mut)
if err != nil {
return err
}
// The collection's all database descriptors should reflect the
// modification.
dbDescs, err := descriptors.GetAllDatabaseDescriptors(ctx, txn)
if err != nil {
return err
}
require.Len(t, dbDescs, 4)
require.Equal(t, dbDescs[0], mut.ImmutableCopy())
return nil
}))
})
t.Run("schemas for database", func(t *testing.T) {
require.NoError(t, sql.DescsTxn(ctx, &execCfg, func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
// Warm up cache.
dbDesc, err := descriptors.GetDatabaseDesc(ctx, txn, "db", tree.DatabaseLookupFlags{})
if err != nil {
return err
}
_, err = descriptors.GetSchemasForDatabase(ctx, txn, dbDesc)
if err != nil {
return err
}
// Modify schema name.
schemaDesc, err := descriptors.GetMutableSchemaByName(ctx, txn, dbDesc, "schema", tree.SchemaLookupFlags{Required: true})
if err != nil {
return err
}
schemaDesc.SchemaDesc().Name = "new_name"
err = descriptors.AddUncommittedDescriptor(schemaDesc.(catalog.MutableDescriptor))
if err != nil {
return err
}
// The collection's schemas for database should reflect the modification.
schemas, err := descriptors.GetSchemasForDatabase(ctx, txn, dbDesc)
if err != nil {
return err
}
require.Len(t, schemas, 2)
require.Equal(t, schemas[schemaDesc.GetID()], schemaDesc.GetName())
return nil
}))
})
}
2 changes: 1 addition & 1 deletion pkg/sql/catalog/descs/descriptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func (q *byIDLookupContext) lookupLeased(id descpb.ID) (catalog.Descriptor, erro
//
// TODO(ajwerner): More generally leverage this set of kv descriptors on
// the resolution path.
if q.tc.kv.idDefinitelyDoesNotExist(id) {
if q.tc.idDefinitelyDoesNotExist(id) {
return nil, catalog.ErrDescriptorNotFound
}
desc, shouldReadFromStore, err := q.tc.leased.getByID(q.ctx, q.tc.deadlineHolder(q.txn), id)
Expand Down
16 changes: 4 additions & 12 deletions pkg/sql/catalog/descs/kv_descriptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ type kvDescriptors struct {
// allDescriptors is a slice of all available descriptors. The descriptors
// are cached to avoid repeated lookups by users like virtual tables. The
// cache is purged whenever events would cause a scan of all descriptors to
// return different values, such as when the txn timestamp changes or when
// new descriptors are written in the txn.
// return different values, such as when the txn timestamp changes. However,
// when descriptors are written in the transaction, the cache is not purged.
// Users of allDescriptors are responsible for checking uncommitted
// descriptors for such changes.
//
// TODO(ajwerner): This cache may be problematic in clusters with very large
// numbers of descriptors.
Expand Down Expand Up @@ -106,9 +108,6 @@ func (kd *kvDescriptors) reset(ctx context.Context) {

// releaseAllDescriptors releases the cached slice of all descriptors
// held by Collection.
//
// TODO(ajwerner): Make this unnecessary by ensuring that all writes properly
// interact with this layer.
func (kd *kvDescriptors) releaseAllDescriptors() {
kd.allDescriptors.clear()
kd.allDatabaseDescriptors = nil
Expand Down Expand Up @@ -336,10 +335,3 @@ func (kd *kvDescriptors) getSchemasForDatabase(
}
return kd.allSchemasForDatabase[db.GetID()], nil
}

func (kd *kvDescriptors) idDefinitelyDoesNotExist(id descpb.ID) bool {
if kd.allDescriptors.isUnset() {
return false
}
return !kd.allDescriptors.contains(id)
}
16 changes: 16 additions & 0 deletions pkg/sql/catalog/descs/uncommitted_descriptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,22 @@ func (ud *uncommittedDescriptors) getUncommittedDescriptorsForValidation() (
return descs
}

// iterateMutableDescriptors applies a function to immutable copies of mutable
// descriptors. This is useful to avoid invalidating the kvDescriptors cache,
// because a collection can use these latest modifications to amend a stale
// descriptors cache.
func (ud *uncommittedDescriptors) iterateMutableDescriptors(
f func(catalog.Descriptor) error,
) error {
return ud.descs.IterateByID(func(entry catalog.NameEntry) error {
mut := entry.(*uncommittedDescriptor).mutable
if mut == nil {
return nil
}
return f(mut.ImmutableCopy())
})
}

func (ud *uncommittedDescriptors) hasUncommittedTables() (has bool) {
_ = ud.iterateUncommittedByID(func(desc catalog.Descriptor) error {
if _, has = desc.(catalog.TableDescriptor); has {
Expand Down