Skip to content

Commit

Permalink
Merge pull request #107573 from THardy98/backport23.1-107549
Browse files Browse the repository at this point in the history
release-23.1: persistedsqlstats: add AOST clause to statement statistics size check
  • Loading branch information
maryliag authored Jul 25, 2023
2 parents 4d7fd0e + f899eea commit 9657947
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 30 deletions.
40 changes: 10 additions & 30 deletions pkg/sql/sqlstats/persistedsqlstats/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,12 @@ func (s *PersistedSQLStats) Flush(ctx context.Context) {

aggregatedTs := s.ComputeAggregatedTs()

if s.stmtsLimitSizeReached(ctx) || s.txnsLimitSizeReached(ctx) {
// We only check the statement count as there should always be at least as many statements as transactions.
limitReached, err := s.StmtsLimitSizeReached(ctx)
if err != nil {
log.Errorf(ctx, "encountered an error at flush, checking for statement statistics size limit: %v", err)
}
if limitReached {
log.Infof(ctx, "unable to flush fingerprints because table limit was reached.")
} else {
var wg sync.WaitGroup
Expand All @@ -92,7 +97,7 @@ func (s *PersistedSQLStats) Flush(ctx context.Context) {
}
}

func (s *PersistedSQLStats) stmtsLimitSizeReached(ctx context.Context) bool {
func (s *PersistedSQLStats) StmtsLimitSizeReached(ctx context.Context) (bool, error) {
maxPersistedRows := float64(SQLStatsMaxPersistedRows.Get(&s.SQLStats.GetClusterSettings().SV))

readStmt := `
Expand All @@ -101,7 +106,7 @@ SELECT
FROM
system.statement_statistics
`

readStmt += s.cfg.Knobs.GetAOSTClause()
row, err := s.cfg.DB.Executor().QueryRowEx(
ctx,
"fetch-stmt-count",
Expand All @@ -111,35 +116,10 @@ FROM
)

if err != nil {
return false
}
actualSize := float64(tree.MustBeDInt(row[0]))
return actualSize > (maxPersistedRows * 1.5)
}

func (s *PersistedSQLStats) txnsLimitSizeReached(ctx context.Context) bool {
maxPersistedRows := float64(SQLStatsMaxPersistedRows.Get(&s.SQLStats.GetClusterSettings().SV))

readStmt := `
SELECT
count(*)
FROM
system.transaction_statistics
`

row, err := s.cfg.DB.Executor().QueryRowEx(
ctx,
"fetch-txn-count",
nil,
sessiondata.NodeUserSessionDataOverride,
readStmt,
)

if err != nil {
return false
return false, err
}
actualSize := float64(tree.MustBeDInt(row[0]))
return actualSize > (maxPersistedRows * 1.5)
return actualSize > (maxPersistedRows * 1.5), nil
}

func (s *PersistedSQLStats) flushStmtStats(ctx context.Context, aggregatedTs time.Time) {
Expand Down
72 changes: 72 additions & 0 deletions pkg/sql/sqlstats/persistedsqlstats/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,64 @@ func TestSQLStatsPersistedLimitReached(t *testing.T) {
require.Equal(t, txnStatsCountFlush3, txnStatsCountFlush2)
}

func TestSQLStatsReadLimitSizeOnLockedTable(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
s, conn, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(context.Background())
sqlConn := sqlutils.MakeSQLRunner(conn)
sqlConn.Exec(t, `INSERT INTO system.users VALUES ('node', NULL, true, 3); GRANT node TO root`)
waitForFollowerReadTimestamp(t, sqlConn)
pss := s.SQLServer().(*sql.Server).GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats)

const minNumExpectedStmts = int64(3)
// Maximum number of persisted rows less than minNumExpectedStmts/1.5
const maxNumPersistedRows = 1
sqlConn.Exec(t, "SELECT 1")
sqlConn.Exec(t, "SELECT 1, 2")
sqlConn.Exec(t, "SELECT 1, 2, 3")

pss.Flush(ctx)
stmtStatsCountFlush, _ := countStats(t, sqlConn)

// Ensure we have some rows in system.statement_statistics
require.GreaterOrEqual(t, stmtStatsCountFlush, minNumExpectedStmts)

// Set sql.stats.persisted_rows.max
sqlConn.Exec(t, fmt.Sprintf("SET CLUSTER SETTING sql.stats.persisted_rows.max=%d", maxNumPersistedRows))
testutils.SucceedsSoon(t, func() error {
var appliedSetting int
row := sqlConn.QueryRow(t, "SHOW CLUSTER SETTING sql.stats.persisted_rows.max")
row.Scan(&appliedSetting)
if appliedSetting != maxNumPersistedRows {
return errors.Newf("waiting for sql.stats.persisted_rows.max to be applied")
}
return nil
})

// Begin a transaction.
sqlConn.Exec(t, "BEGIN")
// Lock the table. Create a state of contention.
sqlConn.Exec(t, "SELECT * FROM system.statement_statistics FOR UPDATE")

// Ensure that we can read from the table despite it being locked, due to the follower read (AOST).
// Expect that the number of statements in the table exceeds sql.stats.persisted_rows.max * 1.5
// (meaning that the limit will be reached) and no error. We need SucceedsSoon here for the follower
// read timestamp to catch up enough for this state to be reached.
testutils.SucceedsSoon(t, func() error {
limitReached, err := pss.StmtsLimitSizeReached(ctx)
if limitReached != true {
return errors.New("waiting for limit reached to be true")
}
return err
})

// Close the transaction.
sqlConn.Exec(t, "COMMIT")
}

func TestSQLStatsPlanSampling(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -824,3 +882,17 @@ WHERE
require.Equal(t, nodeID, gatewayNodeID, "Gateway NodeID")
require.Equal(t, "[1]", allNodesIds, "All NodeIDs from statistics")
}

func waitForFollowerReadTimestamp(t *testing.T, sqlConn *sqlutils.SQLRunner) {
var hlcTimestamp time.Time
sqlConn.QueryRow(t, `SELECT hlc_to_timestamp(cluster_logical_timestamp())`).Scan(&hlcTimestamp)

testutils.SucceedsSoon(t, func() error {
var followerReadTimestamp time.Time
sqlConn.QueryRow(t, `SELECT follower_read_timestamp()`).Scan(&followerReadTimestamp)
if followerReadTimestamp.Before(hlcTimestamp) {
return errors.New("waiting for follower_read_timestamp to be passed hlc timestamp")
}
return nil
})
}

0 comments on commit 9657947

Please sign in to comment.