From f899eeaca56b39b23573047d880b9af25ede5c44 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Tue, 25 Jul 2023 13:35:15 -0400 Subject: [PATCH] persistedsqlstats: add AOST clause to statement statistics size check This change as an AOST clause to the statement statistics size check. This helps mitigate contention on the `system.statement_statistics` table which has caused the sql stats compaction job to fail. Release note (bug fix): this change reduces contention on the `system.statement_statistics` table which has caused the SQL statistics compaction job to fail --- pkg/sql/sqlstats/persistedsqlstats/flush.go | 40 +++-------- .../sqlstats/persistedsqlstats/flush_test.go | 72 +++++++++++++++++++ 2 files changed, 82 insertions(+), 30 deletions(-) diff --git a/pkg/sql/sqlstats/persistedsqlstats/flush.go b/pkg/sql/sqlstats/persistedsqlstats/flush.go index 8345c315cfce..30b52ca529f8 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/flush.go +++ b/pkg/sql/sqlstats/persistedsqlstats/flush.go @@ -72,7 +72,12 @@ func (s *PersistedSQLStats) Flush(ctx context.Context) { aggregatedTs := s.ComputeAggregatedTs() - if s.stmtsLimitSizeReached(ctx) || s.txnsLimitSizeReached(ctx) { + // We only check the statement count as there should always be at least as many statements as transactions. + limitReached, err := s.StmtsLimitSizeReached(ctx) + if err != nil { + log.Errorf(ctx, "encountered an error at flush, checking for statement statistics size limit: %v", err) + } + if limitReached { log.Infof(ctx, "unable to flush fingerprints because table limit was reached.") } else { var wg sync.WaitGroup @@ -92,7 +97,7 @@ func (s *PersistedSQLStats) Flush(ctx context.Context) { } } -func (s *PersistedSQLStats) stmtsLimitSizeReached(ctx context.Context) bool { +func (s *PersistedSQLStats) StmtsLimitSizeReached(ctx context.Context) (bool, error) { maxPersistedRows := float64(SQLStatsMaxPersistedRows.Get(&s.SQLStats.GetClusterSettings().SV)) readStmt := ` @@ -101,7 +106,7 @@ SELECT FROM system.statement_statistics ` - + readStmt += s.cfg.Knobs.GetAOSTClause() row, err := s.cfg.DB.Executor().QueryRowEx( ctx, "fetch-stmt-count", @@ -111,35 +116,10 @@ FROM ) if err != nil { - return false - } - actualSize := float64(tree.MustBeDInt(row[0])) - return actualSize > (maxPersistedRows * 1.5) -} - -func (s *PersistedSQLStats) txnsLimitSizeReached(ctx context.Context) bool { - maxPersistedRows := float64(SQLStatsMaxPersistedRows.Get(&s.SQLStats.GetClusterSettings().SV)) - - readStmt := ` -SELECT - count(*) -FROM - system.transaction_statistics -` - - row, err := s.cfg.DB.Executor().QueryRowEx( - ctx, - "fetch-txn-count", - nil, - sessiondata.NodeUserSessionDataOverride, - readStmt, - ) - - if err != nil { - return false + return false, err } actualSize := float64(tree.MustBeDInt(row[0])) - return actualSize > (maxPersistedRows * 1.5) + return actualSize > (maxPersistedRows * 1.5), nil } func (s *PersistedSQLStats) flushStmtStats(ctx context.Context, aggregatedTs time.Time) { diff --git a/pkg/sql/sqlstats/persistedsqlstats/flush_test.go b/pkg/sql/sqlstats/persistedsqlstats/flush_test.go index e8547aa7d74c..1ddf5c6f8f80 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/flush_test.go +++ b/pkg/sql/sqlstats/persistedsqlstats/flush_test.go @@ -527,6 +527,64 @@ func TestSQLStatsPersistedLimitReached(t *testing.T) { require.Equal(t, txnStatsCountFlush3, txnStatsCountFlush2) } +func TestSQLStatsReadLimitSizeOnLockedTable(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + s, conn, _ := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(context.Background()) + sqlConn := sqlutils.MakeSQLRunner(conn) + sqlConn.Exec(t, `INSERT INTO system.users VALUES ('node', NULL, true, 3); GRANT node TO root`) + waitForFollowerReadTimestamp(t, sqlConn) + pss := s.SQLServer().(*sql.Server).GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats) + + const minNumExpectedStmts = int64(3) + // Maximum number of persisted rows less than minNumExpectedStmts/1.5 + const maxNumPersistedRows = 1 + sqlConn.Exec(t, "SELECT 1") + sqlConn.Exec(t, "SELECT 1, 2") + sqlConn.Exec(t, "SELECT 1, 2, 3") + + pss.Flush(ctx) + stmtStatsCountFlush, _ := countStats(t, sqlConn) + + // Ensure we have some rows in system.statement_statistics + require.GreaterOrEqual(t, stmtStatsCountFlush, minNumExpectedStmts) + + // Set sql.stats.persisted_rows.max + sqlConn.Exec(t, fmt.Sprintf("SET CLUSTER SETTING sql.stats.persisted_rows.max=%d", maxNumPersistedRows)) + testutils.SucceedsSoon(t, func() error { + var appliedSetting int + row := sqlConn.QueryRow(t, "SHOW CLUSTER SETTING sql.stats.persisted_rows.max") + row.Scan(&appliedSetting) + if appliedSetting != maxNumPersistedRows { + return errors.Newf("waiting for sql.stats.persisted_rows.max to be applied") + } + return nil + }) + + // Begin a transaction. + sqlConn.Exec(t, "BEGIN") + // Lock the table. Create a state of contention. + sqlConn.Exec(t, "SELECT * FROM system.statement_statistics FOR UPDATE") + + // Ensure that we can read from the table despite it being locked, due to the follower read (AOST). + // Expect that the number of statements in the table exceeds sql.stats.persisted_rows.max * 1.5 + // (meaning that the limit will be reached) and no error. We need SucceedsSoon here for the follower + // read timestamp to catch up enough for this state to be reached. + testutils.SucceedsSoon(t, func() error { + limitReached, err := pss.StmtsLimitSizeReached(ctx) + if limitReached != true { + return errors.New("waiting for limit reached to be true") + } + return err + }) + + // Close the transaction. + sqlConn.Exec(t, "COMMIT") +} + func TestSQLStatsPlanSampling(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -824,3 +882,17 @@ WHERE require.Equal(t, nodeID, gatewayNodeID, "Gateway NodeID") require.Equal(t, "[1]", allNodesIds, "All NodeIDs from statistics") } + +func waitForFollowerReadTimestamp(t *testing.T, sqlConn *sqlutils.SQLRunner) { + var hlcTimestamp time.Time + sqlConn.QueryRow(t, `SELECT hlc_to_timestamp(cluster_logical_timestamp())`).Scan(&hlcTimestamp) + + testutils.SucceedsSoon(t, func() error { + var followerReadTimestamp time.Time + sqlConn.QueryRow(t, `SELECT follower_read_timestamp()`).Scan(&followerReadTimestamp) + if followerReadTimestamp.Before(hlcTimestamp) { + return errors.New("waiting for follower_read_timestamp to be passed hlc timestamp") + } + return nil + }) +}