diff --git a/pkg/ccl/backupccl/backup_job.go b/pkg/ccl/backupccl/backup_job.go index 05be3bc970c2..802f5f4e9e43 100644 --- a/pkg/ccl/backupccl/backup_job.go +++ b/pkg/ccl/backupccl/backup_job.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/covering" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -343,9 +344,10 @@ func backup( } var tableStatistics []*stats.TableStatisticProto for i := range backupManifest.Descriptors { - if tableDesc, _, _, _ := descpb.FromDescriptor(&backupManifest.Descriptors[i]); tableDesc != nil { + if tbl, _, _, _ := descpb.FromDescriptor(&backupManifest.Descriptors[i]); tbl != nil { + tableDesc := tabledesc.NewBuilder(tbl).BuildImmutableTable() // Collect all the table stats for this table. - tableStatisticsAcc, err := statsCache.GetTableStats(ctx, tableDesc.GetID()) + tableStatisticsAcc, err := statsCache.GetTableStats(ctx, tableDesc) if err != nil { // Successfully backed up data is more valuable than table stats that can // be recomputed after restore, and so if we fail to collect the stats of a diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 1fb8431023b2..6a071b16ae5f 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -1830,7 +1830,8 @@ func (r *restoreResumer) ReportResults(ctx context.Context, resultsCh chan<- tre func (r *restoreResumer) notifyStatsRefresherOfNewTables() { details := r.job.Details().(jobspb.RestoreDetails) for i := range details.TableDescs { - r.execCfg.StatsRefresher.NotifyMutation(details.TableDescs[i].GetID(), math.MaxInt32 /* rowsAffected */) + desc := tabledesc.NewBuilder(details.TableDescs[i]).BuildImmutableTable() + r.execCfg.StatsRefresher.NotifyMutation(desc, math.MaxInt32 /* rowsAffected */) } } diff --git a/pkg/ccl/importccl/import_stmt.go b/pkg/ccl/importccl/import_stmt.go index 0dd4096f8e9e..b387c15b3d8a 100644 --- a/pkg/ccl/importccl/import_stmt.go +++ b/pkg/ccl/importccl/import_stmt.go @@ -2273,7 +2273,8 @@ func (r *importResumer) publishTables(ctx context.Context, execCfg *sql.Executor // rows affected per table, so we use a large number because we want to make // sure that stats always get created/refreshed here. for i := range details.Tables { - execCfg.StatsRefresher.NotifyMutation(details.Tables[i].Desc.ID, math.MaxInt32 /* rowsAffected */) + desc := tabledesc.NewBuilder(details.Tables[i].Desc).BuildImmutableTable() + execCfg.StatsRefresher.NotifyMutation(desc, math.MaxInt32 /* rowsAffected */) } return nil diff --git a/pkg/sql/authorization.go b/pkg/sql/authorization.go index fa540f50bdcb..1f3e629eb01a 100644 --- a/pkg/sql/authorization.go +++ b/pkg/sql/authorization.go @@ -162,7 +162,7 @@ func getOwnerOfDesc(desc catalog.Descriptor) security.SQLUsername { if owner.Undefined() { // If the descriptor is ownerless and the descriptor is part of the system db, // node is the owner. - if desc.GetID() == keys.SystemDatabaseID || desc.GetParentID() == keys.SystemDatabaseID { + if catalog.IsSystemDescriptor(desc) { owner = security.NodeUserName() } else { // This check is redundant in this case since admin already has privilege diff --git a/pkg/sql/catalog/descriptor.go b/pkg/sql/catalog/descriptor.go index 8f78c684ec4a..befa7bfa788b 100644 --- a/pkg/sql/catalog/descriptor.go +++ b/pkg/sql/catalog/descriptor.go @@ -477,3 +477,12 @@ func FormatSafeDescriptorProperties(w *redact.StringBuilder, desc Descriptor) { w.Printf(", NumDrainingNames: %d", len(drainingNames)) } } + +// IsSystemDescriptor returns true iff the descriptor is a system or a reserved +// descriptor. +func IsSystemDescriptor(desc Descriptor) bool { + if desc.GetID() <= keys.MaxReservedDescID { + return true + } + return desc.GetParentID() == keys.SystemDatabaseID +} diff --git a/pkg/sql/catalog/lease/helpers_test.go b/pkg/sql/catalog/lease/helpers_test.go index 738d5465ec79..f2fb01cf98c0 100644 --- a/pkg/sql/catalog/lease/helpers_test.go +++ b/pkg/sql/catalog/lease/helpers_test.go @@ -161,7 +161,7 @@ func (m *Manager) PublishMultiple( if err != nil { return nil, err } - expectedVersions[id] = expected + expectedVersions[id] = expected.GetVersion() } descs := make(map[descpb.ID]catalog.MutableDescriptor) diff --git a/pkg/sql/catalog/lease/lease.go b/pkg/sql/catalog/lease/lease.go index ad1a66ac1d70..e9069df59990 100644 --- a/pkg/sql/catalog/lease/lease.go +++ b/pkg/sql/catalog/lease/lease.go @@ -106,32 +106,30 @@ func (m *Manager) WaitForNoVersion( } // WaitForOneVersion returns once there are no unexpired leases on the -// previous version of the descriptor. It returns the current version. +// previous version of the descriptor. It returns the descriptor with the +// current version. // After returning there can only be versions of the descriptor >= to the // returned version. Lease acquisition (see acquire()) maintains the // invariant that no new leases for desc.Version-1 will be granted once // desc.Version exists. func (m *Manager) WaitForOneVersion( ctx context.Context, id descpb.ID, retryOpts retry.Options, -) (descpb.DescriptorVersion, error) { - var version descpb.DescriptorVersion +) (desc catalog.Descriptor, _ error) { for lastCount, r := 0, retry.Start(retryOpts); r.Next(); { - var desc catalog.Descriptor if err := m.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { desc, err = catalogkv.MustGetDescriptorByID(ctx, txn, m.Codec(), id) return err }); err != nil { - return 0, err + return nil, err } // Check to see if there are any leases that still exist on the previous // version of the descriptor. now := m.storage.clock.Now() descs := []IDVersion{NewIDVersionPrev(desc.GetName(), desc.GetID(), desc.GetVersion())} - version = desc.GetVersion() count, err := CountLeases(ctx, m.storage.internalExecutor, descs, now) if err != nil { - return 0, err + return nil, err } if count == 0 { break @@ -141,7 +139,7 @@ func (m *Manager) WaitForOneVersion( log.Infof(ctx, "waiting for %d leases to expire: desc=%v", count, descs) } } - return version, nil + return desc, nil } // IDVersion represents a descriptor ID, version pair that are diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 0f88f6875e74..ef5511e52d37 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -2746,8 +2746,7 @@ func (ex *connExecutor) notifyStatsRefresherOfNewTables(ctx context.Context) { // Initiate a run of CREATE STATISTICS. We use a large number // for rowsAffected because we want to make sure that stats always get // created/refreshed here. - ex.planner.execCfg.StatsRefresher. - NotifyMutation(desc.GetID(), math.MaxInt32 /* rowsAffected */) + ex.planner.execCfg.StatsRefresher.NotifyMutation(desc, math.MaxInt32 /* rowsAffected */) } } } diff --git a/pkg/sql/delete.go b/pkg/sql/delete.go index 49dab04b41cc..d123cd231fea 100644 --- a/pkg/sql/delete.go +++ b/pkg/sql/delete.go @@ -140,10 +140,7 @@ func (d *deleteNode) BatchedNext(params runParams) (bool, error) { } // Possibly initiate a run of CREATE STATISTICS. - params.ExecCfg().StatsRefresher.NotifyMutation( - d.run.td.tableDesc().GetID(), - d.run.td.lastBatchSize, - ) + params.ExecCfg().StatsRefresher.NotifyMutation(d.run.td.tableDesc(), d.run.td.lastBatchSize) return d.run.td.lastBatchSize > 0, nil } diff --git a/pkg/sql/delete_range.go b/pkg/sql/delete_range.go index 46f0ce3e733c..17e48123df64 100644 --- a/pkg/sql/delete_range.go +++ b/pkg/sql/delete_range.go @@ -170,7 +170,7 @@ func (d *deleteRangeNode) startExec(params runParams) error { } // Possibly initiate a run of CREATE STATISTICS. - params.ExecCfg().StatsRefresher.NotifyMutation(d.desc.GetID(), d.rowCount) + params.ExecCfg().StatsRefresher.NotifyMutation(d.desc, d.rowCount) return nil } diff --git a/pkg/sql/distsql_plan_stats.go b/pkg/sql/distsql_plan_stats.go index f6b35d0850bf..f2e6a41b90d9 100644 --- a/pkg/sql/distsql_plan_stats.go +++ b/pkg/sql/distsql_plan_stats.go @@ -200,7 +200,7 @@ func (dsp *DistSQLPlanner) createStatsPlan( ) // Estimate the expected number of rows based on existing stats in the cache. - tableStats, err := planCtx.ExtendedEvalCtx.ExecCfg.TableStatsCache.GetTableStats(planCtx.ctx, desc.GetID()) + tableStats, err := planCtx.ExtendedEvalCtx.ExecCfg.TableStatsCache.GetTableStats(planCtx.ctx, desc) if err != nil { return nil, err } diff --git a/pkg/sql/gcjob/gc_job.go b/pkg/sql/gcjob/gc_job.go index 07d5b68f6f78..ce96662bec25 100644 --- a/pkg/sql/gcjob/gc_job.go +++ b/pkg/sql/gcjob/gc_job.go @@ -104,7 +104,8 @@ func (r schemaChangeGCResumer) Resume(ctx context.Context, execCtx interface{}) if len(details.InterleavedIndexes) > 0 { // Before deleting any indexes, ensure that old versions of the table // descriptor are no longer in use. - if err := sql.WaitToUpdateLeases(ctx, execCfg.LeaseManager, details.InterleavedTable.ID); err != nil { + _, err := sql.WaitToUpdateLeases(ctx, execCfg.LeaseManager, details.InterleavedTable.ID) + if err != nil { return err } interleavedIndexIDs := make([]descpb.IndexID, len(details.InterleavedIndexes)) diff --git a/pkg/sql/gcjob/index_garbage_collection.go b/pkg/sql/gcjob/index_garbage_collection.go index 629549b342e7..7ce451ca58fa 100644 --- a/pkg/sql/gcjob/index_garbage_collection.go +++ b/pkg/sql/gcjob/index_garbage_collection.go @@ -19,7 +19,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -44,18 +43,15 @@ func gcIndexes( // Before deleting any indexes, ensure that old versions of the table descriptor // are no longer in use. This is necessary in the case of truncate, where we // schedule a GC Job in the transaction that commits the truncation. - if err := sql.WaitToUpdateLeases(ctx, execCfg.LeaseManager, parentID); err != nil { + parentDesc, err := sql.WaitToUpdateLeases(ctx, execCfg.LeaseManager, parentID) + if err != nil { return err } - var parentTable catalog.TableDescriptor - if err := execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { - parentTable, err = catalogkv.MustGetTableDescByID(ctx, txn, execCfg.Codec, parentID) - return err - }); err != nil { - return errors.Wrapf(err, "fetching parent table %d", parentID) + parentTable, isTable := parentDesc.(catalog.TableDescriptor) + if !isTable { + return errors.AssertionFailedf("expected descriptor %d to be a table, not %T", parentID, parentDesc) } - for _, index := range droppedIndexes { if index.Status != jobspb.SchemaChangeGCProgress_DELETING { continue diff --git a/pkg/sql/insert.go b/pkg/sql/insert.go index dde50306e5c0..f8c3b49dcd5c 100644 --- a/pkg/sql/insert.go +++ b/pkg/sql/insert.go @@ -262,7 +262,7 @@ func (n *insertNode) BatchedNext(params runParams) (bool, error) { } // Possibly initiate a run of CREATE STATISTICS. - params.ExecCfg().StatsRefresher.NotifyMutation(n.run.ti.tableDesc().GetID(), n.run.ti.lastBatchSize) + params.ExecCfg().StatsRefresher.NotifyMutation(n.run.ti.tableDesc(), n.run.ti.lastBatchSize) return n.run.ti.lastBatchSize > 0, nil } diff --git a/pkg/sql/insert_fast_path.go b/pkg/sql/insert_fast_path.go index 3aa58888bc3a..6795adca9890 100644 --- a/pkg/sql/insert_fast_path.go +++ b/pkg/sql/insert_fast_path.go @@ -309,7 +309,7 @@ func (n *insertFastPathNode) BatchedNext(params runParams) (bool, error) { n.run.done = true // Possibly initiate a run of CREATE STATISTICS. - params.ExecCfg().StatsRefresher.NotifyMutation(n.run.ti.ri.Helper.TableDesc.GetID(), len(n.input)) + params.ExecCfg().StatsRefresher.NotifyMutation(n.run.ti.ri.Helper.TableDesc, len(n.input)) return true, nil } diff --git a/pkg/sql/opt_catalog.go b/pkg/sql/opt_catalog.go index 55ffc5020994..6b8f70ef6340 100644 --- a/pkg/sql/opt_catalog.go +++ b/pkg/sql/opt_catalog.go @@ -419,7 +419,7 @@ func (oc *optCatalog) dataSourceForTable( var tableStats []*stats.TableStatistic if !flags.NoTableStats { var err error - tableStats, err = oc.planner.execCfg.TableStatsCache.GetTableStats(context.TODO(), desc.GetID()) + tableStats, err = oc.planner.execCfg.TableStatsCache.GetTableStats(context.TODO(), desc) if err != nil { // Ignore any error. We still want to be able to run queries even if we lose // access to the statistics table. diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index 664153387606..7c9af8635512 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -602,7 +602,8 @@ func (sc *SchemaChanger) exec(ctx context.Context) error { // returns, so that the new schema is live everywhere. This is not needed for // correctness but is done to make the UI experience/tests predictable. waitToUpdateLeases := func(refreshStats bool) error { - if err := WaitToUpdateLeases(ctx, sc.leaseMgr, sc.descID); err != nil { + latestDesc, err := WaitToUpdateLeases(ctx, sc.leaseMgr, sc.descID) + if err != nil { if errors.Is(err, catalog.ErrDescriptorNotFound) { return err } @@ -614,7 +615,7 @@ func (sc *SchemaChanger) exec(ctx context.Context) error { // We wait to trigger a stats refresh until we know the leases have been // updated. if refreshStats { - sc.refreshStats() + sc.refreshStats(latestDesc) } return nil } @@ -761,7 +762,8 @@ func (sc *SchemaChanger) handlePermanentSchemaChangeError( // returns, so that the new schema is live everywhere. This is not needed for // correctness but is done to make the UI experience/tests predictable. waitToUpdateLeases := func(refreshStats bool) error { - if err := WaitToUpdateLeases(ctx, sc.leaseMgr, sc.descID); err != nil { + desc, err := WaitToUpdateLeases(ctx, sc.leaseMgr, sc.descID) + if err != nil { if errors.Is(err, catalog.ErrDescriptorNotFound) { return err } @@ -773,7 +775,7 @@ func (sc *SchemaChanger) handlePermanentSchemaChangeError( // We wait to trigger a stats refresh until we know the leases have been // updated. if refreshStats { - sc.refreshStats() + sc.refreshStats(desc) } return nil } @@ -1009,7 +1011,9 @@ func (sc *SchemaChanger) createIndexGCJob( // WaitToUpdateLeases until the entire cluster has been updated to the latest // version of the descriptor. -func WaitToUpdateLeases(ctx context.Context, leaseMgr *lease.Manager, descID descpb.ID) error { +func WaitToUpdateLeases( + ctx context.Context, leaseMgr *lease.Manager, descID descpb.ID, +) (catalog.Descriptor, error) { // Aggressively retry because there might be a user waiting for the // schema change to complete. retryOpts := retry.Options{ @@ -1018,9 +1022,13 @@ func WaitToUpdateLeases(ctx context.Context, leaseMgr *lease.Manager, descID des Multiplier: 1.5, } log.Infof(ctx, "waiting for a single version...") - version, err := leaseMgr.WaitForOneVersion(ctx, descID, retryOpts) + desc, err := leaseMgr.WaitForOneVersion(ctx, descID, retryOpts) + var version descpb.DescriptorVersion + if desc != nil { + version = desc.GetVersion() + } log.Infof(ctx, "waiting for a single version... done (at v %d)", version) - return err + return desc, err } // done finalizes the mutations (adds new cols/indexes to the table). @@ -1465,11 +1473,13 @@ func (sc *SchemaChanger) runStateMachineAndBackfill(ctx context.Context) error { return sc.done(ctx) } -func (sc *SchemaChanger) refreshStats() { +func (sc *SchemaChanger) refreshStats(desc catalog.Descriptor) { // Initiate an asynchronous run of CREATE STATISTICS. We use a large number // for rowsAffected because we want to make sure that stats always get // created/refreshed here. - sc.execCfg.StatsRefresher.NotifyMutation(sc.descID, math.MaxInt32 /* rowsAffected */) + if tableDesc, ok := desc.(catalog.TableDescriptor); ok { + sc.execCfg.StatsRefresher.NotifyMutation(tableDesc, math.MaxInt32 /* rowsAffected */) + } } // maybeReverseMutations reverses the direction of all the mutations with the diff --git a/pkg/sql/stats/BUILD.bazel b/pkg/sql/stats/BUILD.bazel index 8df206a30405..5feff480bc48 100644 --- a/pkg/sql/stats/BUILD.bazel +++ b/pkg/sql/stats/BUILD.bazel @@ -25,6 +25,7 @@ go_library( "//pkg/roachpb:with-mocks", "//pkg/settings", "//pkg/settings/cluster", + "//pkg/sql/catalog", "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/descs", "//pkg/sql/catalog/systemschema", @@ -82,9 +83,11 @@ go_test( "//pkg/server", "//pkg/settings/cluster", "//pkg/sql", + "//pkg/sql/catalog", "//pkg/sql/catalog/catalogkv", "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/descs", + "//pkg/sql/catalog/tabledesc", "//pkg/sql/row", "//pkg/sql/rowenc", "//pkg/sql/rowexec", diff --git a/pkg/sql/stats/automatic_stats.go b/pkg/sql/stats/automatic_stats.go index 142981666e8d..4704e6502ba5 100644 --- a/pkg/sql/stats/automatic_stats.go +++ b/pkg/sql/stats/automatic_stats.go @@ -20,7 +20,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -323,11 +325,22 @@ func (r *Refresher) ensureAllTables( // Use a historical read so as to disable txn contention resolution. getAllTablesQuery := fmt.Sprintf( ` -SELECT table_id FROM crdb_internal.tables AS OF SYSTEM TIME '-%s' -WHERE database_name IS NOT NULL -AND drop_time IS NULL -`, - initialTableCollectionDelay) +SELECT + tbl.table_id +FROM + crdb_internal.tables AS tbl + INNER JOIN system.descriptor AS d ON d.id = tbl.table_id + AS OF SYSTEM TIME '-%s' +WHERE + tbl.database_name IS NOT NULL + AND tbl.database_name <> '%s' + AND tbl.drop_time IS NULL + AND ( + crdb_internal.pb_to_json('cockroach.sql.sqlbase.Descriptor', d.descriptor, false)->'table'->>'viewQuery' + ) IS NULL;`, + initialTableCollectionDelay, + systemschema.SystemDatabaseName, + ) it, err := r.ex.QueryIterator( ctx, @@ -340,10 +353,9 @@ AND drop_time IS NULL for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) { row := it.Cur() tableID := descpb.ID(*row[0].(*tree.DInt)) - // Don't create statistics for system tables or virtual tables. - // TODO(rytaft): Don't add views here either. Unfortunately views are not - // identified differently from tables in crdb_internal.tables. - if !descpb.IsReservedID(tableID) && !descpb.IsVirtualTable(tableID) { + // Don't create statistics for virtual tables. + // The query already excludes views and system tables. + if !descpb.IsVirtualTable(tableID) { r.mutationCounts[tableID] += 0 } } @@ -363,32 +375,26 @@ AND drop_time IS NULL // Refresher that a table has been mutated. It should be called after any // successful insert, update, upsert or delete. rowsAffected refers to the // number of rows written as part of the mutation operation. -func (r *Refresher) NotifyMutation(tableID descpb.ID, rowsAffected int) { +func (r *Refresher) NotifyMutation(table catalog.TableDescriptor, rowsAffected int) { if !AutomaticStatisticsClusterMode.Get(&r.st.SV) { // Automatic stats are disabled. return } - - if descpb.IsReservedID(tableID) { - // Don't try to create statistics for system tables (most importantly, - // for table_statistics itself). - return - } - if descpb.IsVirtualTable(tableID) { - // Don't try to create statistics for virtual tables. + if !hasStatistics(table) { + // Don't collect stats for this kind of table: system, virtual, view, etc. return } // Send mutation info to the refresher thread to avoid adding latency to // the calling transaction. select { - case r.mutations <- mutation{tableID: tableID, rowsAffected: rowsAffected}: + case r.mutations <- mutation{tableID: table.GetID(), rowsAffected: rowsAffected}: default: // Don't block if there is no room in the buffered channel. if bufferedChanFullLogLimiter.ShouldLog() { log.Warningf(context.TODO(), - "buffered channel is full. Unable to refresh stats for table %d with %d rows affected", - tableID, rowsAffected) + "buffered channel is full. Unable to refresh stats for table %q (%d) with %d rows affected", + table.GetName(), table.GetID(), rowsAffected) } } } @@ -402,7 +408,7 @@ func (r *Refresher) maybeRefreshStats( rowsAffected int64, asOf time.Duration, ) { - tableStats, err := r.cache.GetTableStats(ctx, tableID) + tableStats, err := r.cache.getTableStatsFromCache(ctx, tableID) if err != nil { log.Errorf(ctx, "failed to get table statistics: %v", err) return diff --git a/pkg/sql/stats/automatic_stats_test.go b/pkg/sql/stats/automatic_stats_test.go index 4464e560dbd5..231d85b0b1c9 100644 --- a/pkg/sql/stats/automatic_stats_test.go +++ b/pkg/sql/stats/automatic_stats_test.go @@ -23,9 +23,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/types" @@ -74,7 +76,7 @@ func TestMaybeRefreshStats(t *testing.T) { refresher := MakeRefresher(st, executor, cache, time.Microsecond /* asOfTime */) // There should not be any stats yet. - if err := checkStatsCount(ctx, cache, descA.GetID(), 0 /* expected */); err != nil { + if err := checkStatsCount(ctx, cache, descA, 0 /* expected */); err != nil { t.Fatal(err) } @@ -83,7 +85,7 @@ func TestMaybeRefreshStats(t *testing.T) { refresher.maybeRefreshStats( ctx, s.Stopper(), descA.GetID(), 0 /* rowsAffected */, time.Microsecond, /* asOf */ ) - if err := checkStatsCount(ctx, cache, descA.GetID(), 1 /* expected */); err != nil { + if err := checkStatsCount(ctx, cache, descA, 1 /* expected */); err != nil { t.Fatal(err) } @@ -92,7 +94,7 @@ func TestMaybeRefreshStats(t *testing.T) { refresher.maybeRefreshStats( ctx, s.Stopper(), descA.GetID(), 0 /* rowsAffected */, time.Microsecond, /* asOf */ ) - if err := checkStatsCount(ctx, cache, descA.GetID(), 1 /* expected */); err != nil { + if err := checkStatsCount(ctx, cache, descA, 1 /* expected */); err != nil { t.Fatal(err) } @@ -101,7 +103,7 @@ func TestMaybeRefreshStats(t *testing.T) { refresher.maybeRefreshStats( ctx, s.Stopper(), descA.GetID(), 10 /* rowsAffected */, time.Microsecond, /* asOf */ ) - if err := checkStatsCount(ctx, cache, descA.GetID(), 2 /* expected */); err != nil { + if err := checkStatsCount(ctx, cache, descA, 2 /* expected */); err != nil { t.Fatal(err) } @@ -140,7 +142,7 @@ func TestAverageRefreshTime(t *testing.T) { INSERT INTO t.a VALUES (1);`) executor := s.InternalExecutor().(sqlutil.InternalExecutor) - tableID := catalogkv.TestingGetTableDescriptor(s.DB(), keys.SystemSQLCodec, "t", "a").GetID() + table := catalogkv.TestingGetTableDescriptor(s.DB(), keys.SystemSQLCodec, "t", "a") cache := NewTableStatisticsCache( ctx, 10, /* cacheSize */ @@ -160,7 +162,7 @@ func TestAverageRefreshTime(t *testing.T) { checkAverageRefreshTime := func(expected time.Duration) error { return testutils.SucceedsSoonError(func() error { - stats, err := cache.GetTableStats(ctx, tableID) + stats, err := cache.GetTableStats(ctx, table) if err != nil { return err } @@ -176,7 +178,7 @@ func TestAverageRefreshTime(t *testing.T) { // expectedAge time ago if lessThan is true (false). checkMostRecentStat := func(expectedAge time.Duration, lessThan bool) error { return testutils.SucceedsSoonError(func() error { - stats, err := cache.GetTableStats(ctx, tableID) + stats, err := cache.GetTableStats(ctx, table) if err != nil { return err } @@ -218,7 +220,7 @@ func TestAverageRefreshTime(t *testing.T) { "distinctCount", "nullCount" ) VALUES ($1, $2, $3, $4, $5, $6, $7)`, - tableID, + table.GetID(), name, columnIDs, createdAt, @@ -252,7 +254,7 @@ func TestAverageRefreshTime(t *testing.T) { }); err != nil { t.Fatal(err) } - if err := checkStatsCount(ctx, cache, tableID, 10 /* expected */); err != nil { + if err := checkStatsCount(ctx, cache, table, 10 /* expected */); err != nil { t.Fatal(err) } @@ -284,7 +286,7 @@ func TestAverageRefreshTime(t *testing.T) { }); err != nil { t.Fatal(err) } - if err := checkStatsCount(ctx, cache, tableID, 20 /* expected */); err != nil { + if err := checkStatsCount(ctx, cache, table, 20 /* expected */); err != nil { t.Fatal(err) } @@ -307,9 +309,9 @@ func TestAverageRefreshTime(t *testing.T) { // the statistics on table t. With rowsAffected=0, the probability of refresh // is 0. refresher.maybeRefreshStats( - ctx, s.Stopper(), tableID, 0 /* rowsAffected */, time.Microsecond, /* asOf */ + ctx, s.Stopper(), table.GetID(), 0 /* rowsAffected */, time.Microsecond, /* asOf */ ) - if err := checkStatsCount(ctx, cache, tableID, 20 /* expected */); err != nil { + if err := checkStatsCount(ctx, cache, table, 20 /* expected */); err != nil { t.Fatal(err) } @@ -335,7 +337,7 @@ func TestAverageRefreshTime(t *testing.T) { }); err != nil { t.Fatal(err) } - if err := checkStatsCount(ctx, cache, tableID, 30 /* expected */); err != nil { + if err := checkStatsCount(ctx, cache, table, 30 /* expected */); err != nil { t.Fatal(err) } @@ -357,9 +359,9 @@ func TestAverageRefreshTime(t *testing.T) { // remain (5 from column k and 10 from column v), since the old stats on k // were deleted. refresher.maybeRefreshStats( - ctx, s.Stopper(), tableID, 0 /* rowsAffected */, time.Microsecond, /* asOf */ + ctx, s.Stopper(), table.GetID(), 0 /* rowsAffected */, time.Microsecond, /* asOf */ ) - if err := checkStatsCount(ctx, cache, tableID, 15 /* expected */); err != nil { + if err := checkStatsCount(ctx, cache, table, 15 /* expected */); err != nil { t.Fatal(err) } } @@ -473,10 +475,13 @@ func TestMutationsChannel(t *testing.T) { mutations: make(chan mutation, refreshChanBufferLen), } + tbl := descpb.TableDescriptor{ID: 53, ParentID: 52, Name: "foo"} + tableDesc := tabledesc.NewBuilder(&tbl).BuildImmutableTable() + // Test that the mutations channel doesn't block even when we add 10 more // items than can fit in the buffer. for i := 0; i < refreshChanBufferLen+10; i++ { - r.NotifyMutation(descpb.ID(53), 5 /* rowsAffected */) + r.NotifyMutation(tableDesc, 5 /* rowsAffected */) } if expected, actual := refreshChanBufferLen, len(r.mutations); expected != actual { @@ -520,10 +525,10 @@ func TestDefaultColumns(t *testing.T) { } func checkStatsCount( - ctx context.Context, cache *TableStatisticsCache, tableID descpb.ID, expected int, + ctx context.Context, cache *TableStatisticsCache, table catalog.TableDescriptor, expected int, ) error { return testutils.SucceedsSoonError(func() error { - stats, err := cache.GetTableStats(ctx, tableID) + stats, err := cache.GetTableStats(ctx, table) if err != nil { return err } diff --git a/pkg/sql/stats/delete_stats_test.go b/pkg/sql/stats/delete_stats_test.go index 9c31ff3fcdf2..e86048daaaa0 100644 --- a/pkg/sql/stats/delete_stats_test.go +++ b/pkg/sql/stats/delete_stats_test.go @@ -273,7 +273,7 @@ func TestDeleteOldStatsForColumns(t *testing.T) { } return testutils.SucceedsSoonError(func() error { - tableStats, err := cache.GetTableStats(ctx, tableID) + tableStats, err := cache.getTableStatsFromCache(ctx, tableID) if err != nil { return err } @@ -281,7 +281,7 @@ func TestDeleteOldStatsForColumns(t *testing.T) { for i := range testData { stat := &testData[i] if stat.TableID != tableID { - stats, err := cache.GetTableStats(ctx, stat.TableID) + stats, err := cache.getTableStatsFromCache(ctx, stat.TableID) if err != nil { return err } diff --git a/pkg/sql/stats/stats_cache.go b/pkg/sql/stats/stats_cache.go index c276b1d800ef..5a8443b01a49 100644 --- a/pkg/sql/stats/stats_cache.go +++ b/pkg/sql/stats/stats_cache.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" @@ -206,7 +207,7 @@ func decodeTableStatisticsKV( return descpb.ID(uint32(*tableID)), nil } -// GetTableStats looks up statistics for the requested table ID in the cache, +// GetTableStats looks up statistics for the requested table in the cache, // and if the stats are not present in the cache, it looks them up in // system.table_statistics. // @@ -216,18 +217,37 @@ func decodeTableStatisticsKV( // // The statistics are ordered by their CreatedAt time (newest-to-oldest). func (sc *TableStatisticsCache) GetTableStats( - ctx context.Context, tableID descpb.ID, + ctx context.Context, table catalog.TableDescriptor, ) ([]*TableStatistic, error) { - if descpb.IsReservedID(tableID) { + if !hasStatistics(table) { + return nil, nil + } + return sc.getTableStatsFromCache(ctx, table.GetID()) +} + +// hasStatistics returns true if the table can have statistics collected for it. +func hasStatistics(table catalog.TableDescriptor) bool { + if catalog.IsSystemDescriptor(table) { // Don't try to get statistics for system tables (most importantly, // for table_statistics itself). - return nil, nil + return false } - if descpb.IsVirtualTable(tableID) { + if table.IsVirtualTable() { // Don't try to get statistics for virtual tables. - return nil, nil + return false + } + if table.IsView() { + // Don't try to get statistics for views. + return false } + return true +} +// getTableStatsFromCache is like GetTableStats but assumes that the table ID +// is safe to fetch statistics for: non-system, non-virtual, non-view, etc. +func (sc *TableStatisticsCache) getTableStatsFromCache( + ctx context.Context, tableID descpb.ID, +) ([]*TableStatistic, error) { sc.mu.Lock() defer sc.mu.Unlock() diff --git a/pkg/sql/stats/stats_cache_test.go b/pkg/sql/stats/stats_cache_test.go index eb9eac046187..f0b9f4ac02d4 100644 --- a/pkg/sql/stats/stats_cache_test.go +++ b/pkg/sql/stats/stats_cache_test.go @@ -115,7 +115,7 @@ func checkStatsForTable( // Perform the lookup and refresh, and confirm the // returned stats match the expected values. - statsList, err := sc.GetTableStats(ctx, tableID) + statsList, err := sc.getTableStatsFromCache(ctx, tableID) if err != nil { t.Fatalf("error retrieving stats: %s", err) } @@ -357,7 +357,7 @@ CREATE STATISTICS s FROM tt; tbl := catalogkv.TestingGetTableDescriptor(kvDB, keys.SystemSQLCodec, "t", "tt") // Get stats for our table. We are ensuring here that the access to the stats // for tt properly hydrates the user defined type t before access. - stats, err := sc.GetTableStats(ctx, tbl.GetID()) + stats, err := sc.GetTableStats(ctx, tbl) if err != nil { t.Fatal(err) } @@ -373,7 +373,7 @@ CREATE STATISTICS s FROM tt; sc.InvalidateTableStats(ctx, tbl.GetID()) // Verify that GetTableStats ignores the statistic on the now unknown type and // returns the rest. - stats, err = sc.GetTableStats(ctx, tbl.GetID()) + stats, err = sc.GetTableStats(ctx, tbl) if err != nil { t.Fatal(err) } @@ -429,7 +429,7 @@ func TestCacheWait(t *testing.T) { for n := 0; n < 10; n++ { wg.Add(1) go func() { - stats, err := sc.GetTableStats(ctx, id) + stats, err := sc.getTableStatsFromCache(ctx, id) if err != nil { t.Error(err) } else if !checkStats(stats, expectedStats[id]) { @@ -480,10 +480,9 @@ func TestCacheAutoRefresh(t *testing.T) { sr0.Exec(t, "INSERT INTO test.t VALUES (1, 1), (2, 2), (3, 3)") tableDesc := catalogkv.TestingGetTableDescriptor(tc.Server(0).DB(), keys.SystemSQLCodec, "test", "t") - tableID := tableDesc.GetID() expectNStats := func(n int) error { - stats, err := sc.GetTableStats(ctx, tableID) + stats, err := sc.GetTableStats(ctx, tableDesc) if err != nil { t.Fatal(err) } diff --git a/pkg/sql/type_change.go b/pkg/sql/type_change.go index 0e69095ba3a4..946a3bd75585 100644 --- a/pkg/sql/type_change.go +++ b/pkg/sql/type_change.go @@ -218,7 +218,7 @@ func refreshTypeDescriptorLeases( ids = append(ids, typeDesc.GetArrayTypeID()) } for _, id := range ids { - if updateErr := WaitToUpdateLeases(ctx, leaseMgr, id); updateErr != nil { + if _, updateErr := WaitToUpdateLeases(ctx, leaseMgr, id); updateErr != nil { // Swallow the descriptor not found error. if errors.Is(updateErr, catalog.ErrDescriptorNotFound) { log.Infof(ctx, diff --git a/pkg/sql/update.go b/pkg/sql/update.go index 0fac095eff04..98afd1dbe9a0 100644 --- a/pkg/sql/update.go +++ b/pkg/sql/update.go @@ -198,10 +198,7 @@ func (u *updateNode) BatchedNext(params runParams) (bool, error) { } // Possibly initiate a run of CREATE STATISTICS. - params.ExecCfg().StatsRefresher.NotifyMutation( - u.run.tu.tableDesc().GetID(), - u.run.tu.lastBatchSize, - ) + params.ExecCfg().StatsRefresher.NotifyMutation(u.run.tu.tableDesc(), u.run.tu.lastBatchSize) return u.run.tu.lastBatchSize > 0, nil } diff --git a/pkg/sql/upsert.go b/pkg/sql/upsert.go index 2fb8aa0cd947..598b183f446b 100644 --- a/pkg/sql/upsert.go +++ b/pkg/sql/upsert.go @@ -126,10 +126,7 @@ func (n *upsertNode) BatchedNext(params runParams) (bool, error) { } // Possibly initiate a run of CREATE STATISTICS. - params.ExecCfg().StatsRefresher.NotifyMutation( - n.run.tw.tableDesc().GetID(), - n.run.tw.lastBatchSize, - ) + params.ExecCfg().StatsRefresher.NotifyMutation(n.run.tw.tableDesc(), n.run.tw.lastBatchSize) return n.run.tw.lastBatchSize > 0, nil }