diff --git a/pkg/gossip/keys.go b/pkg/gossip/keys.go index 7a8c771dadd7..0528ff873130 100644 --- a/pkg/gossip/keys.go +++ b/pkg/gossip/keys.go @@ -186,22 +186,6 @@ func MakeTableStatAddedKey(tableID uint32) string { return MakeKey(KeyTableStatAddedPrefix, strconv.FormatUint(uint64(tableID), 10 /* base */)) } -// TableIDFromTableStatAddedKey attempts to extract the table ID from the -// provided key. -// The key should have been constructed by MakeTableStatAddedKey. -// Returns an error if the key is not of the correct type or is not parsable. -func TableIDFromTableStatAddedKey(key string) (uint32, error) { - trimmedKey, err := removePrefixFromKey(key, KeyTableStatAddedPrefix) - if err != nil { - return 0, err - } - tableID, err := strconv.ParseUint(trimmedKey, 10 /* base */, 32 /* bitSize */) - if err != nil { - return 0, errors.Wrapf(err, "failed parsing table ID from key %q", key) - } - return uint32(tableID), nil -} - // removePrefixFromKey removes the key prefix and separator and returns what's // left. Returns an error if the key doesn't have this prefix. func removePrefixFromKey(key, prefix string) (string, error) { diff --git a/pkg/kv/kvclient/rangefeed/rangefeed.go b/pkg/kv/kvclient/rangefeed/rangefeed.go index fbea8d55fc28..f182ea9df1a5 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed.go @@ -93,8 +93,13 @@ func newFactory(stopper *stop.Stopper, client kvDB, knobs *TestingKnobs) *Factor } } -// RangeFeed constructs a new RangeFeed. The only error which can be returned -// will indicate that the server is being shut down. +// RangeFeed constructs a new RangeFeed. +// +// The range feed can be stopped via Close(); otherwise, it will stop when the +// server shuts down. +// +// The only error which can be returned will indicate that the server is being +// shut down. func (f *Factory) RangeFeed( ctx context.Context, name string, diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index cbf6535c86c0..e52758bd5dec 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -522,13 +522,14 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { ), TableStatsCache: stats.NewTableStatisticsCache( + ctx, cfg.TableStatCacheSize, - cfg.gossip, cfg.db, cfg.circularInternalExecutor, codec, leaseMgr, cfg.Settings, + cfg.rangeFeedFactory, ), QueryCache: querycache.New(cfg.QueryCacheSize), diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index 502e395545bd..65a8427fb7e3 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -336,6 +336,14 @@ func (ts *TestServer) Gossip() *gossip.Gossip { return nil } +// RangeFeedFactory is part of serverutils.TestServerInterface. +func (ts *TestServer) RangeFeedFactory() interface{} { + if ts != nil { + return ts.sqlServer.execCfg.RangeFeedFactory + } + return (*rangefeed.Factory)(nil) +} + // Clock returns the clock used by the TestServer. func (ts *TestServer) Clock() *hlc.Clock { if ts != nil { diff --git a/pkg/sql/catalog/lease/lease.go b/pkg/sql/catalog/lease/lease.go index 3395eaadd7c6..abb8efb19039 100644 --- a/pkg/sql/catalog/lease/lease.go +++ b/pkg/sql/catalog/lease/lease.go @@ -1853,6 +1853,8 @@ func (m *Manager) watchForRangefeedUpdates( } } // Ignore errors here because they indicate that the server is shutting down. + // Also note that the range feed automatically shuts down when the server + // shuts down, so we don't need to call Close() ourselves. _, _ = m.rangeFeedFactory.RangeFeed( ctx, "lease", descriptorTableSpan, m.getResolvedTimestamp(), handleEvent, ) diff --git a/pkg/sql/create_stats.go b/pkg/sql/create_stats.go index f5a443bfb901..49e3cf186d48 100644 --- a/pkg/sql/create_stats.go +++ b/pkg/sql/create_stats.go @@ -565,12 +565,6 @@ func (r *createStatsResumer) Resume(ctx context.Context, execCtx interface{}) er }); err != nil { return err } - - // Refresh the local cache if Gossip is not available. - if _, ok := evalCtx.ExecCfg.Gossip.Optional(47925); !ok { - evalCtx.ExecCfg.TableStatsCache.RefreshTableStats(ctx, r.tableID) - } - // Record this statistics creation in the event log. if !createStatsPostEvents.Get(&evalCtx.Settings.SV) { return nil diff --git a/pkg/sql/stats/BUILD.bazel b/pkg/sql/stats/BUILD.bazel index 21bba9faed7e..9da5b862e4bb 100644 --- a/pkg/sql/stats/BUILD.bazel +++ b/pkg/sql/stats/BUILD.bazel @@ -20,12 +20,14 @@ go_library( "//pkg/gossip", "//pkg/keys", "//pkg/kv", + "//pkg/kv/kvclient/rangefeed", "//pkg/roachpb", "//pkg/settings", "//pkg/settings/cluster", "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/descs", "//pkg/sql/catalog/lease", + "//pkg/sql/catalog/systemschema", "//pkg/sql/opt/cat", "//pkg/sql/parser", "//pkg/sql/pgwire/pgcode", @@ -37,6 +39,7 @@ go_library( "//pkg/util", "//pkg/util/cache", "//pkg/util/encoding", + "//pkg/util/hlc", "//pkg/util/log", "//pkg/util/mon", "//pkg/util/protoutil", @@ -56,7 +59,6 @@ go_test( "automatic_stats_test.go", "create_stats_job_test.go", "delete_stats_test.go", - "gossip_invalidation_test.go", "histogram_test.go", "main_test.go", "row_sampling_test.go", @@ -65,11 +67,11 @@ go_test( embed = [":stats"], deps = [ "//pkg/base", - "//pkg/gossip", "//pkg/jobs", "//pkg/jobs/jobspb", "//pkg/keys", "//pkg/kv", + "//pkg/kv/kvclient/rangefeed", "//pkg/kv/kvserver", "//pkg/kv/kvserver/kvserverbase", "//pkg/roachpb", diff --git a/pkg/sql/stats/automatic_stats_test.go b/pkg/sql/stats/automatic_stats_test.go index 499c12d47275..aa64e9b76eb4 100644 --- a/pkg/sql/stats/automatic_stats_test.go +++ b/pkg/sql/stats/automatic_stats_test.go @@ -18,9 +18,9 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" @@ -61,13 +61,14 @@ func TestMaybeRefreshStats(t *testing.T) { executor := s.InternalExecutor().(sqlutil.InternalExecutor) descA := catalogkv.TestingGetTableDescriptor(s.DB(), keys.SystemSQLCodec, "t", "a") cache := NewTableStatisticsCache( + ctx, 10, /* cacheSize */ - gossip.MakeOptionalGossip(s.GossipI().(*gossip.Gossip)), kvDB, executor, keys.SystemSQLCodec, s.LeaseManager().(*lease.Manager), s.ClusterSettings(), + s.RangeFeedFactory().(*rangefeed.Factory), ) refresher := MakeRefresher(st, executor, cache, time.Microsecond /* asOfTime */) @@ -140,13 +141,14 @@ func TestAverageRefreshTime(t *testing.T) { executor := s.InternalExecutor().(sqlutil.InternalExecutor) tableID := catalogkv.TestingGetTableDescriptor(s.DB(), keys.SystemSQLCodec, "t", "a").GetID() cache := NewTableStatisticsCache( + ctx, 10, /* cacheSize */ - gossip.MakeOptionalGossip(s.GossipI().(*gossip.Gossip)), kvDB, executor, keys.SystemSQLCodec, s.LeaseManager().(*lease.Manager), s.ClusterSettings(), + s.RangeFeedFactory().(*rangefeed.Factory), ) refresher := MakeRefresher(st, executor, cache, time.Microsecond /* asOfTime */) @@ -156,7 +158,6 @@ func TestAverageRefreshTime(t *testing.T) { curTime := timeutil.Now() checkAverageRefreshTime := func(expected time.Duration) error { - cache.RefreshTableStats(ctx, tableID) return testutils.SucceedsSoonError(func() error { stats, err := cache.GetTableStats(ctx, tableID) if err != nil { @@ -173,7 +174,6 @@ func TestAverageRefreshTime(t *testing.T) { // Checks that the most recent statistic was created less than (greater than) // expectedAge time ago if lessThan is true (false). checkMostRecentStat := func(expectedAge time.Duration, lessThan bool) error { - cache.RefreshTableStats(ctx, tableID) return testutils.SucceedsSoonError(func() error { stats, err := cache.GetTableStats(ctx, tableID) if err != nil { @@ -388,13 +388,14 @@ func TestAutoStatsReadOnlyTables(t *testing.T) { executor := s.InternalExecutor().(sqlutil.InternalExecutor) cache := NewTableStatisticsCache( + ctx, 10, /* cacheSize */ - gossip.MakeOptionalGossip(s.GossipI().(*gossip.Gossip)), kvDB, executor, keys.SystemSQLCodec, s.LeaseManager().(*lease.Manager), s.ClusterSettings(), + s.RangeFeedFactory().(*rangefeed.Factory), ) refresher := MakeRefresher(st, executor, cache, time.Microsecond /* asOfTime */) @@ -435,13 +436,14 @@ func TestNoRetryOnFailure(t *testing.T) { executor := s.InternalExecutor().(sqlutil.InternalExecutor) cache := NewTableStatisticsCache( + ctx, 10, /* cacheSize */ - gossip.MakeOptionalGossip(s.GossipI().(*gossip.Gossip)), kvDB, executor, keys.SystemSQLCodec, s.LeaseManager().(*lease.Manager), s.ClusterSettings(), + s.RangeFeedFactory().(*rangefeed.Factory), ) r := MakeRefresher(st, executor, cache, time.Microsecond /* asOfTime */) @@ -519,7 +521,6 @@ func TestDefaultColumns(t *testing.T) { func checkStatsCount( ctx context.Context, cache *TableStatisticsCache, tableID descpb.ID, expected int, ) error { - cache.RefreshTableStats(ctx, tableID) return testutils.SucceedsSoonError(func() error { stats, err := cache.GetTableStats(ctx, tableID) if err != nil { diff --git a/pkg/sql/stats/delete_stats_test.go b/pkg/sql/stats/delete_stats_test.go index fb4eb438c312..fd52bcf5cf15 100644 --- a/pkg/sql/stats/delete_stats_test.go +++ b/pkg/sql/stats/delete_stats_test.go @@ -18,9 +18,9 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" @@ -40,13 +40,14 @@ func TestDeleteOldStatsForColumns(t *testing.T) { defer s.Stopper().Stop(ctx) ex := s.InternalExecutor().(sqlutil.InternalExecutor) cache := NewTableStatisticsCache( + ctx, 10, /* cacheSize */ - gossip.MakeOptionalGossip(s.GossipI().(*gossip.Gossip)), db, ex, keys.SystemSQLCodec, s.LeaseManager().(*lease.Manager), s.ClusterSettings(), + s.RangeFeedFactory().(*rangefeed.Factory), ) // The test data must be ordered by CreatedAt DESC so the calculated set of @@ -270,14 +271,6 @@ func TestDeleteOldStatsForColumns(t *testing.T) { return err } - cache.RefreshTableStats(ctx, tableID) - for i := range testData { - stat := &testData[i] - if stat.TableID != tableID { - cache.RefreshTableStats(ctx, stat.TableID) - } - } - return testutils.SucceedsSoonError(func() error { tableStats, err := cache.GetTableStats(ctx, tableID) if err != nil { diff --git a/pkg/sql/stats/gossip_invalidation_test.go b/pkg/sql/stats/gossip_invalidation_test.go deleted file mode 100644 index ac16e9415b49..000000000000 --- a/pkg/sql/stats/gossip_invalidation_test.go +++ /dev/null @@ -1,88 +0,0 @@ -// Copyright 2017 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package stats_test - -import ( - "context" - "fmt" - "testing" - - "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/gossip" - "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" - "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" - "github.com/cockroachdb/cockroach/pkg/sql/stats" - "github.com/cockroachdb/cockroach/pkg/testutils" - "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" - "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" - "github.com/cockroachdb/cockroach/pkg/util/leaktest" - "github.com/cockroachdb/cockroach/pkg/util/log" -) - -// TestGossipInvalidation verifies that the cache gets invalidated automatically -// when a new stat is generated. -func TestGossipInvalidation(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - ctx := context.Background() - tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{}) - defer tc.Stopper().Stop(ctx) - - s := tc.Server(0) - sc := stats.NewTableStatisticsCache( - 10, /* cacheSize */ - gossip.MakeOptionalGossip(s.GossipI().(*gossip.Gossip)), - s.DB(), - s.InternalExecutor().(sqlutil.InternalExecutor), - keys.SystemSQLCodec, - s.LeaseManager().(*lease.Manager), - s.ClusterSettings(), - ) - - sr0 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) - sr0.Exec(t, "CREATE DATABASE test") - sr0.Exec(t, "CREATE TABLE test.t (k INT PRIMARY KEY, v INT)") - sr0.Exec(t, "INSERT INTO test.t VALUES (1, 1), (2, 2), (3, 3)") - - tableDesc := catalogkv.TestingGetTableDescriptor(tc.Server(0).DB(), keys.SystemSQLCodec, "test", "t") - tableID := tableDesc.GetID() - - expectNStats := func(n int) error { - stats, err := sc.GetTableStats(ctx, tableID) - if err != nil { - t.Fatal(err) - } - if len(stats) != n { - return fmt.Errorf("expected %d stats, got: %v", n, stats) - } - return nil - } - - if err := expectNStats(0); err != nil { - t.Fatal(err) - } - sr1 := sqlutils.MakeSQLRunner(tc.ServerConn(1)) - sr1.Exec(t, "CREATE STATISTICS k ON k FROM test.t") - - testutils.SucceedsSoon(t, func() error { - return expectNStats(1) - }) - - sr2 := sqlutils.MakeSQLRunner(tc.ServerConn(2)) - sr2.Exec(t, "CREATE STATISTICS v ON v FROM test.t") - - testutils.SucceedsSoon(t, func() error { - return expectNStats(2) - }) -} diff --git a/pkg/sql/stats/main_test.go b/pkg/sql/stats/main_test.go index b1e23149a2d2..a452deb16c53 100644 --- a/pkg/sql/stats/main_test.go +++ b/pkg/sql/stats/main_test.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/security/securitytest" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/randutil" ) @@ -25,5 +26,6 @@ func TestMain(m *testing.M) { security.SetAssetLoader(securitytest.EmbeddedAssets) randutil.SeedForTests() serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) os.Exit(m.Run()) } diff --git a/pkg/sql/stats/new_stat.go b/pkg/sql/stats/new_stat.go index 4ac29c9ca4fc..195cbbd5945a 100644 --- a/pkg/sql/stats/new_stat.go +++ b/pkg/sql/stats/new_stat.go @@ -108,6 +108,11 @@ func InsertNewStat( // GossipTableStatAdded causes the statistic caches for this table to be // invalidated. +// +// Note that we no longer use gossip to keep the cache up-to-date, but we still +// send the updates for mixed-version clusters during upgrade. +// +// TODO(radu): remove this in 22.1. func GossipTableStatAdded(g *gossip.Gossip, tableID descpb.ID) error { // TODO(radu): perhaps use a TTL here to avoid having a key per table floating // around forever (we would need the stat cache to evict old entries diff --git a/pkg/sql/stats/stats_cache.go b/pkg/sql/stats/stats_cache.go index e7500033bf9a..b94ff46b5695 100644 --- a/pkg/sql/stats/stats_cache.go +++ b/pkg/sql/stats/stats_cache.go @@ -14,14 +14,15 @@ import ( "context" "sync" - "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" "github.com/cockroachdb/cockroach/pkg/sql/opt/cat" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -29,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/cache" "github.com/cockroachdb/cockroach/pkg/util/encoding" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/syncutil" @@ -67,6 +69,9 @@ type TableStatisticsCache struct { LeaseMgr *lease.Manager Settings *cluster.Settings + + // Used when decoding KV from the range feed. + datumAlloc rowenc.DatumAlloc } // The cache stores *cacheEntry objects. The fields are protected by the @@ -78,18 +83,22 @@ type cacheEntry struct { mustWait bool waitCond sync.Cond - // If refreshing is true, the current statistics for this table are stale, - // and we are in the process of fetching the updated stats from the database. - // In the mean time, other callers can use the stale stats and do not need to - // wait. + // lastRefreshTimestamp is the timestamp at which the last refresh was + // requested; note that the refresh may be ongoing. + // It is zero for entries that were not refreshed since they were added to the + // cache. + lastRefreshTimestamp hlc.Timestamp + + // If refreshing is true, we are in the process of fetching the updated stats + // from the database. In the mean time, other callers can use the stale stats + // and do not need to wait. // - // If a goroutine tries to perform a refresh when a refresh is already - // in progress, it will see that refreshing=true and will set the - // mustRefreshAgain flag to true before returning. When the original - // goroutine that was performing the refresh returns from the database and - // sees that mustRefreshAgain=true, it will trigger another refresh. - refreshing bool - mustRefreshAgain bool + // If a goroutine tries to perform a refresh when a refresh is already in + // progress, it will see that refreshing=true and will just update + // lastRefreshTimestamp before returning. When the original goroutine that was + // performing the refresh returns from the database and sees that the + // timestamp was moved, it will trigger another refresh. + refreshing bool stats []*TableStatistic @@ -100,13 +109,14 @@ type cacheEntry struct { // NewTableStatisticsCache creates a new TableStatisticsCache that can hold // statistics for tables. func NewTableStatisticsCache( + ctx context.Context, cacheSize int, - gw gossip.OptionalGossip, db *kv.DB, sqlExecutor sqlutil.InternalExecutor, codec keys.SQLCodec, leaseManager *lease.Manager, settings *cluster.Settings, + rangeFeedFactory *rangefeed.Factory, ) *TableStatisticsCache { tableStatsCache := &TableStatisticsCache{ ClientDB: db, @@ -119,27 +129,81 @@ func NewTableStatisticsCache( Policy: cache.CacheLRU, ShouldEvict: func(s int, key, value interface{}) bool { return s > cacheSize }, }) - // The stat cache requires redundant callbacks as it is using gossip to - // signal the presence of new stats, not to actually propagate them. - if g, ok := gw.Optional(47925); ok { - g.RegisterCallback( - gossip.MakePrefixPattern(gossip.KeyTableStatAddedPrefix), - tableStatsCache.tableStatAddedGossipUpdate, - gossip.Redundant, - ) + + // Set up a range feed to watch for updates to system.table_statistics. + + statsTablePrefix := codec.TablePrefix(keys.TableStatisticsTableID) + statsTableSpan := roachpb.Span{ + Key: statsTablePrefix, + EndKey: statsTablePrefix.PrefixEnd(), } + + var lastTableID descpb.ID + var lastTS hlc.Timestamp + + handleEvent := func(ctx context.Context, kv *roachpb.RangeFeedValue) { + tableID, err := decodeTableStatisticsKV(codec, kv, &tableStatsCache.datumAlloc) + if err != nil { + log.Warningf(ctx, "failed to decode table statistics row %v: %v", kv.Key, err) + return + } + ts := kv.Value.Timestamp + // A statistics collection inserts multiple rows in one transaction. We + // don't want to call refreshTableStats for each row since it has + // non-trivial overhead. + if tableID == lastTableID && ts == lastTS { + return + } + lastTableID = tableID + lastTS = ts + tableStatsCache.refreshTableStats(ctx, tableID, ts) + } + + // Notes: + // - the range feed automatically stops on server shutdown, we don't need to + // call Close() ourselves. + // - an error here only happens if the server is already shutting down; we + // can safely ignore it. + _, _ = rangeFeedFactory.RangeFeed( + ctx, + "table-stats-cache", + statsTableSpan, + db.Clock().Now(), + handleEvent, + ) + return tableStatsCache } -// tableStatAddedGossipUpdate is the gossip callback that fires when a new -// statistic is available for a table. -func (sc *TableStatisticsCache) tableStatAddedGossipUpdate(key string, value roachpb.Value) { - tableID, err := gossip.TableIDFromTableStatAddedKey(key) +// decodeTableStatisticsKV decodes the table ID from a range feed event on +// system.table_statistics. +func decodeTableStatisticsKV( + codec keys.SQLCodec, kv *roachpb.RangeFeedValue, da *rowenc.DatumAlloc, +) (tableDesc descpb.ID, err error) { + tbl := systemschema.TableStatisticsTable + // The primary key of table_statistics is (tableID INT, statisticID INT). + types := []*types.T{types.Int, types.Int} + dirs := []descpb.IndexDescriptor_Direction{descpb.IndexDescriptor_ASC, descpb.IndexDescriptor_ASC} + keyVals := make([]rowenc.EncDatum, 2) + _, matches, _, err := rowenc.DecodeIndexKey( + codec, tbl, tbl.GetPrimaryIndex().IndexDesc(), types, keyVals, dirs, kv.Key, + ) if err != nil { - log.Errorf(context.Background(), "tableStatAddedGossipUpdate(%s) error: %v", key, err) - return + return 0, err + } + if !matches { + return 0, errors.New("descriptor does not match") + } + + if err := keyVals[0].EnsureDecoded(types[0], da); err != nil { + return 0, err } - sc.RefreshTableStats(context.Background(), descpb.ID(tableID)) + + tableID, ok := keyVals[0].Datum.(*tree.DInt) + if !ok { + return 0, errors.New("invalid tableID value") + } + return descpb.ID(uint32(*tableID)), nil } // GetTableStats looks up statistics for the requested table ID in the cache, @@ -267,7 +331,9 @@ func (sc *TableStatisticsCache) addCacheEntryLocked( // - stats are retrieved from database: // - mutex is locked again and the entry is updated. // -func (sc *TableStatisticsCache) refreshCacheEntry(ctx context.Context, tableID descpb.ID) { +func (sc *TableStatisticsCache) refreshCacheEntry( + ctx context.Context, tableID descpb.ID, ts hlc.Timestamp, +) { sc.mu.Lock() defer sc.mu.Unlock() @@ -279,11 +345,16 @@ func (sc *TableStatisticsCache) refreshCacheEntry(ctx context.Context, tableID d if !found || e.err != nil { return } + if ts.LessEq(e.lastRefreshTimestamp) { + // We already refreshed at this (or a later) timestamp. + return + } + e.lastRefreshTimestamp = ts - // Don't perform a refresh if a refresh is already in progress, but let that - // goroutine know it needs to refresh again. + // Don't perform a refresh if a refresh is already in progress; that goroutine + // will know it needs to refresh again because we changed the + // lastRefreshTimestamp. if e.refreshing { - e.mustRefreshAgain = true return } e.refreshing = true @@ -297,13 +368,15 @@ func (sc *TableStatisticsCache) refreshCacheEntry(ctx context.Context, tableID d defer sc.mu.Lock() log.VEventf(ctx, 1, "refreshing statistics for table %d", tableID) + // TODO(radu): pass the timestamp and use AS OF SYSTEM TIME. stats, err = sc.getTableStatsFromDB(ctx, tableID) log.VEventf(ctx, 1, "done refreshing statistics for table %d", tableID) }() - if !e.mustRefreshAgain { + if e.lastRefreshTimestamp.Equal(ts) { break } - e.mustRefreshAgain = false + // The timestamp has changed; another refresh was requested. + ts = e.lastRefreshTimestamp } e.stats, e.err = stats, err @@ -315,23 +388,21 @@ func (sc *TableStatisticsCache) refreshCacheEntry(ctx context.Context, tableID d } } -// RefreshTableStats refreshes the cached statistics for the given table ID -// by fetching the new stats from the database. -func (sc *TableStatisticsCache) RefreshTableStats(ctx context.Context, tableID descpb.ID) { +// refreshTableStats refreshes the cached statistics for the given table ID by +// fetching the new stats from the database. +func (sc *TableStatisticsCache) refreshTableStats( + ctx context.Context, tableID descpb.ID, ts hlc.Timestamp, +) { log.VEventf(ctx, 1, "refreshing statistics for table %d", tableID) ctx, span := tracing.ForkSpan(ctx, "refresh-table-stats") // Perform an asynchronous refresh of the cache. go func() { defer span.Finish() - sc.refreshCacheEntry(ctx, tableID) + sc.refreshCacheEntry(ctx, tableID, ts) }() } // InvalidateTableStats invalidates the cached statistics for the given table ID. -// -// Note that RefreshTableStats should normally be used instead of this function. -// This function is used only when we want to guarantee that the next query -// uses updated stats. func (sc *TableStatisticsCache) InvalidateTableStats(ctx context.Context, tableID descpb.ID) { log.VEventf(ctx, 1, "evicting statistics for table %d", tableID) sc.mu.Lock() diff --git a/pkg/sql/stats/stats_cache_test.go b/pkg/sql/stats/stats_cache_test.go index 2a2552d6b708..cec2f09e9175 100644 --- a/pkg/sql/stats/stats_cache_test.go +++ b/pkg/sql/stats/stats_cache_test.go @@ -12,6 +12,7 @@ package stats import ( "context" + "fmt" "math/rand" "reflect" "sort" @@ -20,9 +21,9 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" @@ -31,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -233,13 +235,14 @@ func TestCacheBasic(t *testing.T) { // will result in the cache getting populated. When the stats cache size is // exceeded, entries should be evicted according to the LRU policy. sc := NewTableStatisticsCache( + ctx, 2, /* cacheSize */ - gossip.MakeOptionalGossip(s.GossipI().(*gossip.Gossip)), db, ex, keys.SystemSQLCodec, s.LeaseManager().(*lease.Manager), s.ClusterSettings(), + s.RangeFeedFactory().(*rangefeed.Factory), ) for _, tableID := range tableIDs { checkStatsForTable(ctx, t, sc, expectedStats[tableID], tableID) @@ -281,9 +284,8 @@ func TestCacheBasic(t *testing.T) { t.Fatal(err) } - // After refreshing, Table ID 2 should be available immediately in the cache - // for querying, and eventually should contain the updated stat. - sc.RefreshTableStats(ctx, tab2) + // Table ID 2 should be available immediately in the cache for querying, and + // eventually should contain the updated stat. if _, ok := lookupTableStats(ctx, sc, tab2); !ok { t.Fatalf("expected lookup of refreshed key %d to succeed", tab2) } @@ -311,7 +313,6 @@ func TestCacheBasic(t *testing.T) { checkStatsForTable(ctx, t, sc, expectedStats[tab0], tab0) checkStatsForTable(ctx, t, sc, expectedStats[tab1], tab1) - sc.RefreshTableStats(ctx, tab0) // Sleep a bit to give the async refresh process a chance to do something. // Note that this is not flaky - the check below passes even if the refresh is // delayed. @@ -344,13 +345,14 @@ CREATE STATISTICS s FROM tt; _ = kvDB // Make a stats cache. sc := NewTableStatisticsCache( + ctx, 1, - gossip.MakeOptionalGossip(s.GossipI().(*gossip.Gossip)), kvDB, s.InternalExecutor().(sqlutil.InternalExecutor), keys.SystemSQLCodec, s.LeaseManager().(*lease.Manager), s.ClusterSettings(), + s.RangeFeedFactory().(*rangefeed.Factory), ) tbl := catalogkv.TestingGetTableDescriptor(kvDB, keys.SystemSQLCodec, "t", "tt") // Get stats for our table. We are ensuring here that the access to the stats @@ -404,13 +406,14 @@ func TestCacheWait(t *testing.T) { } sort.Sort(tableIDs) sc := NewTableStatisticsCache( + ctx, len(tableIDs), /* cacheSize */ - gossip.MakeOptionalGossip(s.GossipI().(*gossip.Gossip)), db, ex, keys.SystemSQLCodec, s.LeaseManager().(*lease.Manager), s.ClusterSettings(), + s.RangeFeedFactory().(*rangefeed.Factory), ) for _, tableID := range tableIDs { checkStatsForTable(ctx, t, sc, expectedStats[tableID], tableID) @@ -447,3 +450,63 @@ func TestCacheWait(t *testing.T) { } } } + +// TestCacheAutoRefresh verifies that the cache gets refreshed automatically +// when new statistics are added. +func TestCacheAutoRefresh(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + tc := serverutils.StartNewTestCluster(t, 3 /* numNodes */, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + + s := tc.Server(0) + sc := NewTableStatisticsCache( + ctx, + 10, /* cacheSize */ + s.DB(), + s.InternalExecutor().(sqlutil.InternalExecutor), + keys.SystemSQLCodec, + s.LeaseManager().(*lease.Manager), + s.ClusterSettings(), + s.RangeFeedFactory().(*rangefeed.Factory), + ) + + sr0 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + sr0.Exec(t, "SET CLUSTER SETTING sql.stats.automatic_collection.enabled = false") + sr0.Exec(t, "CREATE DATABASE test") + sr0.Exec(t, "CREATE TABLE test.t (k INT PRIMARY KEY, v INT)") + sr0.Exec(t, "INSERT INTO test.t VALUES (1, 1), (2, 2), (3, 3)") + + tableDesc := catalogkv.TestingGetTableDescriptor(tc.Server(0).DB(), keys.SystemSQLCodec, "test", "t") + tableID := tableDesc.GetID() + + expectNStats := func(n int) error { + stats, err := sc.GetTableStats(ctx, tableID) + if err != nil { + t.Fatal(err) + } + if len(stats) != n { + return fmt.Errorf("expected %d stats, got: %v", n, stats) + } + return nil + } + + if err := expectNStats(0); err != nil { + t.Fatal(err) + } + sr1 := sqlutils.MakeSQLRunner(tc.ServerConn(1)) + sr1.Exec(t, "CREATE STATISTICS k ON k FROM test.t") + + testutils.SucceedsSoon(t, func() error { + return expectNStats(1) + }) + + sr2 := sqlutils.MakeSQLRunner(tc.ServerConn(2)) + sr2.Exec(t, "CREATE STATISTICS v ON v FROM test.t") + + testutils.SucceedsSoon(t, func() error { + return expectNStats(2) + }) +} diff --git a/pkg/testutils/serverutils/test_server_shim.go b/pkg/testutils/serverutils/test_server_shim.go index 911fbe3473c0..bcd6ee07722f 100644 --- a/pkg/testutils/serverutils/test_server_shim.go +++ b/pkg/testutils/serverutils/test_server_shim.go @@ -100,6 +100,10 @@ type TestServerInterface interface { // The real return type is *gossip.Gossip. GossipI() interface{} + // RangeFeedFactory returns the range feed factory used by the TestServer. + // The real return type is *rangefeed.Factory. + RangeFeedFactory() interface{} + // Clock returns the clock used by the TestServer. Clock() *hlc.Clock