diff --git a/pkg/sql/sqlstats/persistedsqlstats/cluster_settings.go b/pkg/sql/sqlstats/persistedsqlstats/cluster_settings.go index eac4a5cee64f..e41a07510a54 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/cluster_settings.go +++ b/pkg/sql/sqlstats/persistedsqlstats/cluster_settings.go @@ -136,3 +136,15 @@ var sqlStatsLimitTableSizeEnabled = settings.RegisterBoolSetting( "to grow past sql.stats.persisted_rows.max", true, ) + +// sqlStatsLimitTableCheckInterval is the cluster setting that controls the +// sql stats system tables to grow past the number of rows set by +// sql.stats.persisted_row.max. +var sqlStatsLimitTableCheckInterval = settings.RegisterDurationSetting( + settings.TenantWritable, + "sql.stats.limit_table_size.check_interval", + "controls what interval the check is done on if the statement and "+ + "transaction statistics tables have grown past sql.stats.persisted_rows.max", + 1*time.Hour, + settings.NonNegativeDuration, +) diff --git a/pkg/sql/sqlstats/persistedsqlstats/flush.go b/pkg/sql/sqlstats/persistedsqlstats/flush.go index 17b729a80a4e..d24070fca519 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/flush.go +++ b/pkg/sql/sqlstats/persistedsqlstats/flush.go @@ -13,10 +13,12 @@ package persistedsqlstats import ( "context" "fmt" + "math/rand" "sync" "time" "github.com/cockroachdb/cockroach/pkg/sql/appstatspb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" @@ -104,28 +106,51 @@ func (s *PersistedSQLStats) Flush(ctx context.Context) { } func (s *PersistedSQLStats) StmtsLimitSizeReached(ctx context.Context) (bool, error) { - maxPersistedRows := float64(SQLStatsMaxPersistedRows.Get(&s.SQLStats.GetClusterSettings().SV)) + // Doing a count check on every flush for every node adds a lot of overhead. + // To reduce the overhead only do the check once an hour by default. + intervalToCheck := sqlStatsLimitTableCheckInterval.Get(&s.cfg.Settings.SV) + if !s.lastSizeCheck.IsZero() && s.lastSizeCheck.Add(intervalToCheck).After(timeutil.Now()) { + log.Infof(ctx, "PersistedSQLStats.StmtsLimitSizeReached skipped with last check at: %s and check interval: %s", s.lastSizeCheck, intervalToCheck) + return false, nil + } + + maxPersistedRows := float64(SQLStatsMaxPersistedRows.Get(&s.cfg.Settings.SV)) + + // The statistics table is split into 8 shards. Instead of counting all the + // rows across all the shards the count can be limited to a single shard. + // Then check the size off that one shard. This reduces the risk of causing + // contention or serialization issues. The cleanup is done by the shard, so + // it should prevent the data from being skewed to a single shard. + randomShard := rand.Intn(systemschema.SQLStatsHashShardBucketCount) + readStmt := fmt.Sprintf(`SELECT count(*) + FROM system.statement_statistics + %s + WHERE crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_plan_hash_transaction_fingerprint_id_shard_8 = $1 +`, s.cfg.Knobs.GetAOSTClause()) - readStmt := ` -SELECT - count(*) -FROM - system.statement_statistics -` - readStmt += s.cfg.Knobs.GetAOSTClause() row, err := s.cfg.DB.Executor().QueryRowEx( ctx, "fetch-stmt-count", nil, sessiondata.NodeUserSessionDataOverride, readStmt, + randomShard, ) if err != nil { return false, err } actualSize := float64(tree.MustBeDInt(row[0])) - return actualSize > (maxPersistedRows * 1.5), nil + maxPersistedRowsByShard := maxPersistedRows / systemschema.SQLStatsHashShardBucketCount + isSizeLimitReached := actualSize > (maxPersistedRowsByShard * 1.5) + // If the table is over the limit do the check for every flush. This allows + // the flush to start again as soon as the data is within limits instead of + // needing to wait an hour. + if !isSizeLimitReached { + s.lastSizeCheck = timeutil.Now() + } + + return isSizeLimitReached, nil } func (s *PersistedSQLStats) flushStmtStats(ctx context.Context, aggregatedTs time.Time) { diff --git a/pkg/sql/sqlstats/persistedsqlstats/flush_test.go b/pkg/sql/sqlstats/persistedsqlstats/flush_test.go index ae0bb1d2737e..8adffd466b0a 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/flush_test.go +++ b/pkg/sql/sqlstats/persistedsqlstats/flush_test.go @@ -497,6 +497,18 @@ func TestSQLStatsPersistedLimitReached(t *testing.T) { return nil }) + // Set table size check interval to 1 second. + sqlConn.Exec(t, "SET CLUSTER SETTING sql.stats.limit_table_size.check_interval='0.00001ms'") + testutils.SucceedsSoon(t, func() error { + var appliedSetting string + row := sqlConn.QueryRow(t, "SHOW CLUSTER SETTING sql.stats.limit_table_size.check_interval") + row.Scan(&appliedSetting) + if appliedSetting != "00:00:00" { + return errors.Newf("waiting for sql.stats.limit_table_size.check_interval to be applied: %s", appliedSetting) + } + return nil + }) + sqlConn.Exec(t, "SELECT 1, 2, 3, 4") sqlConn.Exec(t, "SELECT 1, 2, 3, 4, 5") sqlConn.Exec(t, "SELECT 1, 2, 3, 4, 6, 7") @@ -538,6 +550,11 @@ func TestSQLStatsReadLimitSizeOnLockedTable(t *testing.T) { waitForFollowerReadTimestamp(t, sqlConn) pss := s.SQLServer().(*sql.Server).GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats) + // It should be false since nothing has flushed. The table will be empty. + limitReached, err := pss.StmtsLimitSizeReached(ctx) + require.NoError(t, err) + require.False(t, limitReached) + const minNumExpectedStmts = int64(3) // Maximum number of persisted rows less than minNumExpectedStmts/1.5 const maxNumPersistedRows = 1 @@ -563,6 +580,36 @@ func TestSQLStatsReadLimitSizeOnLockedTable(t *testing.T) { return nil }) + // We need SucceedsSoon here for the follower read timestamp to catch up + // enough for this state to be reached. + testutils.SucceedsSoon(t, func() error { + row := sqlConn.QueryRow(t, "SELECT count_rows() FROM system.statement_statistics AS OF SYSTEM TIME follower_read_timestamp()") + var rowCount int + row.Scan(&rowCount) + if rowCount < 3 { + return errors.Newf("waiting for AOST query to return results") + } + return nil + }) + + // It should still return false because it only checks once an hour by default + // unless the previous run was over the limit. + limitReached, err = pss.StmtsLimitSizeReached(ctx) + require.NoError(t, err) + require.False(t, limitReached) + + // Set table size check interval to 1 second. + sqlConn.Exec(t, "SET CLUSTER SETTING sql.stats.limit_table_size.check_interval='1s'") + testutils.SucceedsSoon(t, func() error { + var appliedSetting string + row := sqlConn.QueryRow(t, "SHOW CLUSTER SETTING sql.stats.limit_table_size.check_interval") + row.Scan(&appliedSetting) + if appliedSetting != "00:00:01" { + return errors.Newf("waiting for sql.stats.limit_table_size.check_interval to be applied: %s", appliedSetting) + } + return nil + }) + // Begin a transaction. sqlConn.Exec(t, "BEGIN") // Lock the table. Create a state of contention. @@ -570,15 +617,13 @@ func TestSQLStatsReadLimitSizeOnLockedTable(t *testing.T) { // Ensure that we can read from the table despite it being locked, due to the follower read (AOST). // Expect that the number of statements in the table exceeds sql.stats.persisted_rows.max * 1.5 - // (meaning that the limit will be reached) and no error. We need SucceedsSoon here for the follower - // read timestamp to catch up enough for this state to be reached. - testutils.SucceedsSoon(t, func() error { + // (meaning that the limit will be reached) and no error. Loop to make sure that + // checking it multiple times still returns the correct value. + for i := 0; i < 3; i++ { limitReached, err := pss.StmtsLimitSizeReached(ctx) - if limitReached != true { - return errors.New("waiting for limit reached to be true") - } - return err - }) + require.NoError(t, err) + require.True(t, limitReached) + } // Close the transaction. sqlConn.Exec(t, "COMMIT") diff --git a/pkg/sql/sqlstats/persistedsqlstats/provider.go b/pkg/sql/sqlstats/persistedsqlstats/provider.go index 80f8eb4fb3c2..c24b86fa6b96 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/provider.go +++ b/pkg/sql/sqlstats/persistedsqlstats/provider.go @@ -87,6 +87,9 @@ type PersistedSQLStats struct { setDraining sync.Once // tasksDoneWG is used to wait for all background tasks to finish. tasksDoneWG sync.WaitGroup + + // The last time the size was checked before doing a flush. + lastSizeCheck time.Time } var _ sqlstats.Provider = &PersistedSQLStats{}