diff --git a/pkg/sql/sqlstats/persistedsqlstats/compaction_test.go b/pkg/sql/sqlstats/persistedsqlstats/compaction_test.go index d38f3cfcf15c..0705f7ea3565 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/compaction_test.go +++ b/pkg/sql/sqlstats/persistedsqlstats/compaction_test.go @@ -267,7 +267,7 @@ func TestSQLStatsCompactor(t *testing.T) { // SQL Stats cleanup job. We test this behavior by generating some rows in the // stats system table that are in the current aggregation window and previous // aggregation window. Before running the SQL Stats compaction, we lower the -// row limit in the stats table so that all thw rows will be deleted by the +// row limit in the stats table so that all the rows will be deleted by the // StatsCompactor, if all the generated rows live outside the current // aggregation window. This test asserts that, since some of generated rows live // in the current aggregation interval, those rows will not be deleted by the @@ -293,7 +293,7 @@ func TestSQLStatsForegroundInterference(t *testing.T) { GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats) sqlConn := sqlutils.MakeSQLRunner(conn) - sqlConn.Exec(t, "SET CLUSTER SETTING sql.stats.persisted_rows.max = 1") + sqlConn.Exec(t, "SET CLUSTER SETTING sql.stats.persisted_rows.max = 10") // Generate some data that are older than the current aggregation window, // and then generate some that are within the current aggregation window. diff --git a/pkg/sql/sqlstats/persistedsqlstats/flush.go b/pkg/sql/sqlstats/persistedsqlstats/flush.go index f3791e9beca2..168860f905ba 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/flush.go +++ b/pkg/sql/sqlstats/persistedsqlstats/flush.go @@ -71,8 +71,62 @@ func (s *PersistedSQLStats) Flush(ctx context.Context) { aggregatedTs := s.ComputeAggregatedTs() - s.flushStmtStats(ctx, aggregatedTs) - s.flushTxnStats(ctx, aggregatedTs) + if s.stmtsLimitSizeReached(ctx) || s.txnsLimitSizeReached(ctx) { + log.Infof(ctx, "unable to flush fingerprints because table limit was reached.") + } else { + s.flushStmtStats(ctx, aggregatedTs) + s.flushTxnStats(ctx, aggregatedTs) + } +} + +func (s *PersistedSQLStats) stmtsLimitSizeReached(ctx context.Context) bool { + maxPersistedRows := float64(SQLStatsMaxPersistedRows.Get(&s.SQLStats.GetClusterSettings().SV)) + + readStmt := ` +SELECT + count(*) +FROM + system.statement_statistics +` + + row, err := s.cfg.DB.Executor().QueryRowEx( + ctx, + "fetch-stmt-count", + nil, + sessiondata.NodeUserSessionDataOverride, + readStmt, + ) + + if err != nil { + return false + } + actualSize := float64(tree.MustBeDInt(row[0])) + return actualSize > (maxPersistedRows * 1.5) +} + +func (s *PersistedSQLStats) txnsLimitSizeReached(ctx context.Context) bool { + maxPersistedRows := float64(SQLStatsMaxPersistedRows.Get(&s.SQLStats.GetClusterSettings().SV)) + + readStmt := ` +SELECT + count(*) +FROM + system.transaction_statistics +` + + row, err := s.cfg.DB.Executor().QueryRowEx( + ctx, + "fetch-txn-count", + nil, + sessiondata.NodeUserSessionDataOverride, + readStmt, + ) + + if err != nil { + return false + } + actualSize := float64(tree.MustBeDInt(row[0])) + return actualSize > (maxPersistedRows * 1.5) } func (s *PersistedSQLStats) flushStmtStats(ctx context.Context, aggregatedTs time.Time) { diff --git a/pkg/sql/sqlstats/persistedsqlstats/flush_test.go b/pkg/sql/sqlstats/persistedsqlstats/flush_test.go index 37711ea8b67b..6a1eb4b1e818 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/flush_test.go +++ b/pkg/sql/sqlstats/persistedsqlstats/flush_test.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats" "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -457,6 +458,73 @@ func TestSQLStatsGatewayNodeSetting(t *testing.T) { verifyNodeID(t, sqlConn, "SELECT _", false, "gateway_disabled") } +func TestSQLStatsPersistedLimitReached(t *testing.T) { + skip.UnderStress(t, "During stress, several flushes can be done at the same time, and we don't"+ + "want to test this case for now, because the limit could be more than 1.5 * maxMemory") + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + params, _ := tests.CreateTestServerParams() + s, conn, _ := serverutils.StartServer(t, params) + defer s.Stopper().Stop(context.Background()) + sqlConn := sqlutils.MakeSQLRunner(conn) + + sqlConn.Exec(t, "set cluster setting sql.stats.persisted_rows.max=8") + sqlConn.Exec(t, "set cluster setting sql.metrics.max_mem_stmt_fingerprints=3") + sqlConn.Exec(t, "set cluster setting sql.metrics.max_mem_txn_fingerprints=3") + + // Cleanup data generated during the test creation. + sqlConn.Exec(t, "SELECT crdb_internal.reset_sql_stats()") + + testCases := []struct { + query string + }{ + {query: "SELECT 1"}, + {query: "SELECT 1, 2"}, + {query: "SELECT 1, 2, 3"}, + {query: "SELECT 1, 2, 3, 4"}, + {query: "SELECT 1, 2, 3, 4, 5"}, + {query: "SELECT 1, 2, 3, 4, 5, 6"}, + {query: "SELECT 1, 2, 3, 4, 5, 6, 7"}, + {query: "SELECT 1, 2, 3, 4, 5, 6, 7, 8"}, + {query: "SELECT 1, 2, 3, 4, 5, 6, 7, 8, 9"}, + {query: "SELECT 1, 2, 3, 4, 5, 6, 7, 8, 9, 10"}, + {query: "SELECT 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11"}, + {query: "SELECT 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12"}, + {query: "SELECT 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13"}, + {query: "SELECT 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14"}, + {query: "SELECT 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15"}, + {query: "SELECT 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16"}, + {query: "SELECT 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17"}, + } + + var count int64 + for _, tc := range testCases { + sqlConn.Exec(t, tc.query) + s.SQLServer().(*sql.Server). + GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx) + + // We can flush data if the size of the table is less than 1.5 the value + // sql.stats.persisted_rows.max. + // If we flush, we add up to the value of sql.metrics.max_mem_stmt_fingerprints, + // so the max value that can exist on the system table will be + // sql.stats.persisted_rows.max * 1.5 + sql.metrics.max_mem_stmt_fingerprints: + // 8 * 1.5 + 3 = 15. + rows := sqlConn.QueryRow(t, ` + SELECT count(*) + FROM system.statement_statistics`) + rows.Scan(&count) + require.LessOrEqual(t, count, int64(15)) + + rows = sqlConn.QueryRow(t, ` + SELECT count(*) + FROM system.transaction_statistics`) + rows.Scan(&count) + require.LessOrEqual(t, count, int64(15)) + } +} + type stubTime struct { syncutil.RWMutex t time.Time diff --git a/pkg/sql/sqlstats/sslocal/sql_stats.go b/pkg/sql/sqlstats/sslocal/sql_stats.go index 9866d03b5f10..1456ba560116 100644 --- a/pkg/sql/sqlstats/sslocal/sql_stats.go +++ b/pkg/sql/sqlstats/sslocal/sql_stats.go @@ -108,7 +108,7 @@ func newSQLStats( } // GetTotalFingerprintCount returns total number of unique statement and -// transaction fingerprints stored in the currnet SQLStats. +// transaction fingerprints stored in the current SQLStats. func (s *SQLStats) GetTotalFingerprintCount() int64 { return atomic.LoadInt64(&s.atomic.uniqueStmtFingerprintCount) + atomic.LoadInt64(&s.atomic.uniqueTxnFingerprintCount) } @@ -194,3 +194,7 @@ func (s *SQLStats) resetAndMaybeDumpStats(ctx context.Context, target Sink) (err return err } + +func (s *SQLStats) GetClusterSettings() *cluster.Settings { + return s.st +}