Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
62563: sql: use a range feed for refreshing the table stats cache r=RaduBerinde a=RaduBerinde

We currently use gossip to keep table stats caches updated when new
statistics become available. This mechanism is not available in
multi-tenant configurations (currently we only support single SQL
nodes in that configuration and we refresh the cache on the same node
out of band).

This commit changes this mechanism to use a range feed that listens
for updates to system.table_statistics. This is a cleaner mechanism as
it doesn't require any out-of-band coordination.

The range feed events are on a row-by-row basis; a statistics
collection typically inserts multiple rows. We handle this by
remembering the timestamp of the table_statistics row that triggered
the last update. In addition, we remember information about the last
event to avoid unnecessary overhead.

Note that we continue to send gossip updates when new statistics are
available, to keep things working for mixed-version clusters during
upgrade.

Fixes cockroachdb#47925.

Release note: None

Co-authored-by: Radu Berinde <[email protected]>
  • Loading branch information
craig[bot] and RaduBerinde committed Mar 28, 2021
2 parents 3cfe2a3 + 8905b9b commit 4f06297
Show file tree
Hide file tree
Showing 15 changed files with 230 additions and 183 deletions.
16 changes: 0 additions & 16 deletions pkg/gossip/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,22 +186,6 @@ func MakeTableStatAddedKey(tableID uint32) string {
return MakeKey(KeyTableStatAddedPrefix, strconv.FormatUint(uint64(tableID), 10 /* base */))
}

// TableIDFromTableStatAddedKey attempts to extract the table ID from the
// provided key.
// The key should have been constructed by MakeTableStatAddedKey.
// Returns an error if the key is not of the correct type or is not parsable.
func TableIDFromTableStatAddedKey(key string) (uint32, error) {
trimmedKey, err := removePrefixFromKey(key, KeyTableStatAddedPrefix)
if err != nil {
return 0, err
}
tableID, err := strconv.ParseUint(trimmedKey, 10 /* base */, 32 /* bitSize */)
if err != nil {
return 0, errors.Wrapf(err, "failed parsing table ID from key %q", key)
}
return uint32(tableID), nil
}

// removePrefixFromKey removes the key prefix and separator and returns what's
// left. Returns an error if the key doesn't have this prefix.
func removePrefixFromKey(key, prefix string) (string, error) {
Expand Down
9 changes: 7 additions & 2 deletions pkg/kv/kvclient/rangefeed/rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,13 @@ func newFactory(stopper *stop.Stopper, client kvDB, knobs *TestingKnobs) *Factor
}
}

// RangeFeed constructs a new RangeFeed. The only error which can be returned
// will indicate that the server is being shut down.
// RangeFeed constructs a new RangeFeed.
//
// The range feed can be stopped via Close(); otherwise, it will stop when the
// server shuts down.
//
// The only error which can be returned will indicate that the server is being
// shut down.
func (f *Factory) RangeFeed(
ctx context.Context,
name string,
Expand Down
3 changes: 2 additions & 1 deletion pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,13 +522,14 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
),

TableStatsCache: stats.NewTableStatisticsCache(
ctx,
cfg.TableStatCacheSize,
cfg.gossip,
cfg.db,
cfg.circularInternalExecutor,
codec,
leaseMgr,
cfg.Settings,
cfg.rangeFeedFactory,
),

QueryCache: querycache.New(cfg.QueryCacheSize),
Expand Down
8 changes: 8 additions & 0 deletions pkg/server/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,14 @@ func (ts *TestServer) Gossip() *gossip.Gossip {
return nil
}

// RangeFeedFactory is part of serverutils.TestServerInterface.
func (ts *TestServer) RangeFeedFactory() interface{} {
if ts != nil {
return ts.sqlServer.execCfg.RangeFeedFactory
}
return (*rangefeed.Factory)(nil)
}

// Clock returns the clock used by the TestServer.
func (ts *TestServer) Clock() *hlc.Clock {
if ts != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/catalog/lease/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -1853,6 +1853,8 @@ func (m *Manager) watchForRangefeedUpdates(
}
}
// Ignore errors here because they indicate that the server is shutting down.
// Also note that the range feed automatically shuts down when the server
// shuts down, so we don't need to call Close() ourselves.
_, _ = m.rangeFeedFactory.RangeFeed(
ctx, "lease", descriptorTableSpan, m.getResolvedTimestamp(), handleEvent,
)
Expand Down
6 changes: 0 additions & 6 deletions pkg/sql/create_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,12 +565,6 @@ func (r *createStatsResumer) Resume(ctx context.Context, execCtx interface{}) er
}); err != nil {
return err
}

// Refresh the local cache if Gossip is not available.
if _, ok := evalCtx.ExecCfg.Gossip.Optional(47925); !ok {
evalCtx.ExecCfg.TableStatsCache.RefreshTableStats(ctx, r.tableID)
}

// Record this statistics creation in the event log.
if !createStatsPostEvents.Get(&evalCtx.Settings.SV) {
return nil
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/stats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ go_library(
"//pkg/gossip",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient/rangefeed",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/lease",
"//pkg/sql/catalog/systemschema",
"//pkg/sql/opt/cat",
"//pkg/sql/parser",
"//pkg/sql/pgwire/pgcode",
Expand All @@ -37,6 +39,7 @@ go_library(
"//pkg/util",
"//pkg/util/cache",
"//pkg/util/encoding",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/protoutil",
Expand All @@ -56,7 +59,6 @@ go_test(
"automatic_stats_test.go",
"create_stats_job_test.go",
"delete_stats_test.go",
"gossip_invalidation_test.go",
"histogram_test.go",
"main_test.go",
"row_sampling_test.go",
Expand All @@ -65,11 +67,11 @@ go_test(
embed = [":stats"],
deps = [
"//pkg/base",
"//pkg/gossip",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient/rangefeed",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/roachpb",
Expand Down
17 changes: 9 additions & 8 deletions pkg/sql/stats/automatic_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
Expand Down Expand Up @@ -61,13 +61,14 @@ func TestMaybeRefreshStats(t *testing.T) {
executor := s.InternalExecutor().(sqlutil.InternalExecutor)
descA := catalogkv.TestingGetTableDescriptor(s.DB(), keys.SystemSQLCodec, "t", "a")
cache := NewTableStatisticsCache(
ctx,
10, /* cacheSize */
gossip.MakeOptionalGossip(s.GossipI().(*gossip.Gossip)),
kvDB,
executor,
keys.SystemSQLCodec,
s.LeaseManager().(*lease.Manager),
s.ClusterSettings(),
s.RangeFeedFactory().(*rangefeed.Factory),
)
refresher := MakeRefresher(st, executor, cache, time.Microsecond /* asOfTime */)

Expand Down Expand Up @@ -140,13 +141,14 @@ func TestAverageRefreshTime(t *testing.T) {
executor := s.InternalExecutor().(sqlutil.InternalExecutor)
tableID := catalogkv.TestingGetTableDescriptor(s.DB(), keys.SystemSQLCodec, "t", "a").GetID()
cache := NewTableStatisticsCache(
ctx,
10, /* cacheSize */
gossip.MakeOptionalGossip(s.GossipI().(*gossip.Gossip)),
kvDB,
executor,
keys.SystemSQLCodec,
s.LeaseManager().(*lease.Manager),
s.ClusterSettings(),
s.RangeFeedFactory().(*rangefeed.Factory),
)
refresher := MakeRefresher(st, executor, cache, time.Microsecond /* asOfTime */)

Expand All @@ -156,7 +158,6 @@ func TestAverageRefreshTime(t *testing.T) {
curTime := timeutil.Now()

checkAverageRefreshTime := func(expected time.Duration) error {
cache.RefreshTableStats(ctx, tableID)
return testutils.SucceedsSoonError(func() error {
stats, err := cache.GetTableStats(ctx, tableID)
if err != nil {
Expand All @@ -173,7 +174,6 @@ func TestAverageRefreshTime(t *testing.T) {
// Checks that the most recent statistic was created less than (greater than)
// expectedAge time ago if lessThan is true (false).
checkMostRecentStat := func(expectedAge time.Duration, lessThan bool) error {
cache.RefreshTableStats(ctx, tableID)
return testutils.SucceedsSoonError(func() error {
stats, err := cache.GetTableStats(ctx, tableID)
if err != nil {
Expand Down Expand Up @@ -388,13 +388,14 @@ func TestAutoStatsReadOnlyTables(t *testing.T) {

executor := s.InternalExecutor().(sqlutil.InternalExecutor)
cache := NewTableStatisticsCache(
ctx,
10, /* cacheSize */
gossip.MakeOptionalGossip(s.GossipI().(*gossip.Gossip)),
kvDB,
executor,
keys.SystemSQLCodec,
s.LeaseManager().(*lease.Manager),
s.ClusterSettings(),
s.RangeFeedFactory().(*rangefeed.Factory),
)
refresher := MakeRefresher(st, executor, cache, time.Microsecond /* asOfTime */)

Expand Down Expand Up @@ -435,13 +436,14 @@ func TestNoRetryOnFailure(t *testing.T) {

executor := s.InternalExecutor().(sqlutil.InternalExecutor)
cache := NewTableStatisticsCache(
ctx,
10, /* cacheSize */
gossip.MakeOptionalGossip(s.GossipI().(*gossip.Gossip)),
kvDB,
executor,
keys.SystemSQLCodec,
s.LeaseManager().(*lease.Manager),
s.ClusterSettings(),
s.RangeFeedFactory().(*rangefeed.Factory),
)
r := MakeRefresher(st, executor, cache, time.Microsecond /* asOfTime */)

Expand Down Expand Up @@ -519,7 +521,6 @@ func TestDefaultColumns(t *testing.T) {
func checkStatsCount(
ctx context.Context, cache *TableStatisticsCache, tableID descpb.ID, expected int,
) error {
cache.RefreshTableStats(ctx, tableID)
return testutils.SucceedsSoonError(func() error {
stats, err := cache.GetTableStats(ctx, tableID)
if err != nil {
Expand Down
13 changes: 3 additions & 10 deletions pkg/sql/stats/delete_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
Expand All @@ -40,13 +40,14 @@ func TestDeleteOldStatsForColumns(t *testing.T) {
defer s.Stopper().Stop(ctx)
ex := s.InternalExecutor().(sqlutil.InternalExecutor)
cache := NewTableStatisticsCache(
ctx,
10, /* cacheSize */
gossip.MakeOptionalGossip(s.GossipI().(*gossip.Gossip)),
db,
ex,
keys.SystemSQLCodec,
s.LeaseManager().(*lease.Manager),
s.ClusterSettings(),
s.RangeFeedFactory().(*rangefeed.Factory),
)

// The test data must be ordered by CreatedAt DESC so the calculated set of
Expand Down Expand Up @@ -270,14 +271,6 @@ func TestDeleteOldStatsForColumns(t *testing.T) {
return err
}

cache.RefreshTableStats(ctx, tableID)
for i := range testData {
stat := &testData[i]
if stat.TableID != tableID {
cache.RefreshTableStats(ctx, stat.TableID)
}
}

return testutils.SucceedsSoonError(func() error {
tableStats, err := cache.GetTableStats(ctx, tableID)
if err != nil {
Expand Down
88 changes: 0 additions & 88 deletions pkg/sql/stats/gossip_invalidation_test.go

This file was deleted.

2 changes: 2 additions & 0 deletions pkg/sql/stats/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ import (
"github.com/cockroachdb/cockroach/pkg/security/securitytest"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
)

func TestMain(m *testing.M) {
security.SetAssetLoader(securitytest.EmbeddedAssets)
randutil.SeedForTests()
serverutils.InitTestServerFactory(server.TestServerFactory)
serverutils.InitTestClusterFactory(testcluster.TestClusterFactory)
os.Exit(m.Run())
}
5 changes: 5 additions & 0 deletions pkg/sql/stats/new_stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ func InsertNewStat(

// GossipTableStatAdded causes the statistic caches for this table to be
// invalidated.
//
// Note that we no longer use gossip to keep the cache up-to-date, but we still
// send the updates for mixed-version clusters during upgrade.
//
// TODO(radu): remove this in 22.1.
func GossipTableStatAdded(g *gossip.Gossip, tableID descpb.ID) error {
// TODO(radu): perhaps use a TTL here to avoid having a key per table floating
// around forever (we would need the stat cache to evict old entries
Expand Down
Loading

0 comments on commit 4f06297

Please sign in to comment.