diff --git a/pkg/sql/sqlstats/persistedsqlstats/cluster_settings.go b/pkg/sql/sqlstats/persistedsqlstats/cluster_settings.go index 2682f1e30379..4cd0cf9950e8 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/cluster_settings.go +++ b/pkg/sql/sqlstats/persistedsqlstats/cluster_settings.go @@ -28,6 +28,30 @@ var SQLStatsFlushInterval = settings.RegisterDurationSetting( settings.NonNegativeDurationWithMaximum(time.Hour*24), ).WithPublic() +// MinimumInterval is the cluster setting that controls the minimum interval +// between each flush operation. If flush operations get triggered faster +// than what is allowed by this setting, (e.g. when too many fingerprints are +// generated in a short span of time, which in turn cause memory pressure), the +// flush operation will be aborted. +var MinimumInterval = settings.RegisterDurationSetting( + "sql.stats.flush.minimum_interval", + "the minimum interval that SQL stats can be flushes to disk. If a "+ + "flush operation starts within less than the minimum interval, the flush "+ + "operation will be aborted", + 0, + settings.NonNegativeDuration, +) + +// DiscardInMemoryStatsWhenFlushDisabled is the cluster setting that allows the +// older in-memory SQL stats to be discarded when flushing to persisted tables +// is disabled. +var DiscardInMemoryStatsWhenFlushDisabled = settings.RegisterBoolSetting( + "sql.stats.flush.force_cleanup.enabled", + "if set, older SQL stats are discarded periodically when flushing to "+ + "persisted tables is disabled", + false, +) + // SQLStatsFlushEnabled is the cluster setting that controls if the sqlstats // subsystem persists the statistics into system table. var SQLStatsFlushEnabled = settings.RegisterBoolSetting( diff --git a/pkg/sql/sqlstats/persistedsqlstats/flush.go b/pkg/sql/sqlstats/persistedsqlstats/flush.go index 878c031c6d2d..64c097ed235c 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/flush.go +++ b/pkg/sql/sqlstats/persistedsqlstats/flush.go @@ -29,18 +29,49 @@ import ( // Flush flushes in-memory sql stats into system table. Any errors encountered // during the flush will be logged as warning. func (s *PersistedSQLStats) Flush(ctx context.Context) { + now := s.getTimeNow() + + allowDiscardWhenDisabled := DiscardInMemoryStatsWhenFlushDisabled.Get(&s.cfg.Settings.SV) + minimumFlushInterval := MinimumInterval.Get(&s.cfg.Settings.SV) + + enabled := SQLStatsFlushEnabled.Get(&s.cfg.Settings.SV) + flushingTooSoon := now.Before(s.lastFlushStarted.Add(minimumFlushInterval)) + + // Handle wiping in-memory stats here, we only wipe in-memory stats under 2 + // circumstances: + // 1. flush is enabled, and we are not early aborting the flush due to flushing + // too frequently. + // 2. flush is disabled, but we allow discard in-memory stats when disabled. + shouldWipeInMemoryStats := enabled && !flushingTooSoon + shouldWipeInMemoryStats = shouldWipeInMemoryStats || (!enabled && allowDiscardWhenDisabled) + + if shouldWipeInMemoryStats { + defer func() { + if err := s.SQLStats.Reset(ctx); err != nil { + log.Warningf(ctx, "fail to reset in-memory SQL Stats: %s", err) + } + }() + } + + // Handle early abortion of the flush. + if !enabled { + return + } + + if flushingTooSoon { + log.Infof(ctx, "flush aborted due to high flush frequency. "+ + "The minimum interval between flushes is %s", minimumFlushInterval.String()) + return + } + + s.lastFlushStarted = now log.Infof(ctx, "flushing %d stmt/txn fingerprints (%d bytes) after %s", s.SQLStats.GetTotalFingerprintCount(), s.SQLStats.GetTotalFingerprintBytes(), timeutil.Since(s.lastFlushStarted)) aggregatedTs := s.ComputeAggregatedTs() - s.lastFlushStarted = s.getTimeNow() s.flushStmtStats(ctx, aggregatedTs) s.flushTxnStats(ctx, aggregatedTs) - - if err := s.SQLStats.Reset(ctx); err != nil { - log.Warningf(ctx, "fail to reset in-memory SQL Stats: %s", err) - } } func (s *PersistedSQLStats) flushStmtStats(ctx context.Context, aggregatedTs time.Time) { diff --git a/pkg/sql/sqlstats/persistedsqlstats/flush_test.go b/pkg/sql/sqlstats/persistedsqlstats/flush_test.go index 0390cc355702..c2d806e2fc86 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/flush_test.go +++ b/pkg/sql/sqlstats/persistedsqlstats/flush_test.go @@ -243,6 +243,189 @@ func TestSQLStatsInitialDelay(t *testing.T) { "expected latest nextFlushAt to be %s, but found %s", maxNextRunAt, initialNextFlushAt) } +func TestSQLStatsMinimumFlushInterval(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + fakeTime := stubTime{ + aggInterval: time.Hour, + } + fakeTime.setTime(timeutil.Now()) + + params, _ := tests.CreateTestServerParams() + params.Knobs.SQLStatsKnobs = &sqlstats.TestingKnobs{ + StubTimeNow: fakeTime.Now, + } + s, conn, _ := serverutils.StartServer(t, params) + + defer s.Stopper().Stop(context.Background()) + + sqlConn := sqlutils.MakeSQLRunner(conn) + + sqlConn.Exec(t, "SET CLUSTER SETTING sql.stats.flush.minimum_interval = '10m'") + sqlConn.Exec(t, "SET application_name = 'min_flush_test'") + sqlConn.Exec(t, "SELECT 1") + + s.SQLServer().(*sql.Server). + GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx) + + sqlConn.CheckQueryResults(t, ` + SELECT count(*) + FROM system.statement_statistics + WHERE app_name = 'min_flush_test' + `, [][]string{{"1"}}) + + sqlConn.CheckQueryResults(t, ` + SELECT count(*) + FROM system.transaction_statistics + WHERE app_name = 'min_flush_test' + `, [][]string{{"1"}}) + + // Since by default, the minimum flush interval is 10 minutes, a subsequent + // flush should be no-op. + s.SQLServer().(*sql.Server). + GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx) + + sqlConn.CheckQueryResults(t, ` + SELECT count(*) + FROM system.statement_statistics + WHERE app_name = 'min_flush_test' + `, [][]string{{"1"}}) + + sqlConn.CheckQueryResults(t, ` + SELECT count(*) + FROM system.transaction_statistics + WHERE app_name = 'min_flush_test' + `, [][]string{{"1"}}) + + // We manually set the time to past the minimum flush interval, now the flush + // should succeed. + fakeTime.setTime(fakeTime.Now().Add(time.Hour)) + + s.SQLServer().(*sql.Server). + GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx) + + sqlConn.CheckQueryResults(t, ` + SELECT count(*) > 1 + FROM system.statement_statistics + WHERE app_name = 'min_flush_test' + `, [][]string{{"true"}}) + +} + +func TestInMemoryStatsDiscard(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + params, _ := tests.CreateTestServerParams() + s, conn, _ := serverutils.StartServer(t, params) + observer := + serverutils.OpenDBConn(t, s.ServingSQLAddr(), "", false /* insecure */, s.Stopper()) + + defer s.Stopper().Stop(context.Background()) + + sqlConn := sqlutils.MakeSQLRunner(conn) + sqlConn.Exec(t, + "SET CLUSTER SETTING sql.stats.flush.minimum_interval = '10m'") + observerConn := sqlutils.MakeSQLRunner(observer) + + t.Run("flush_disabled", func(t *testing.T) { + sqlConn.Exec(t, + "SET CLUSTER SETTING sql.stats.flush.force_cleanup.enabled = true") + sqlConn.Exec(t, + "SET CLUSTER SETTING sql.stats.flush.enabled = false") + + sqlConn.Exec(t, "SET application_name = 'flush_disabled_test'") + sqlConn.Exec(t, "SELECT 1") + + observerConn.CheckQueryResults(t, ` + SELECT count(*) + FROM crdb_internal.statement_statistics + WHERE app_name = 'flush_disabled_test' + `, [][]string{{"1"}}) + + s.SQLServer().(*sql.Server). + GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx) + + observerConn.CheckQueryResults(t, ` + SELECT count(*) + FROM crdb_internal.statement_statistics + WHERE app_name = 'flush_disabled_test' + `, [][]string{{"0"}}) + }) + + t.Run("flush_enabled", func(t *testing.T) { + // Now turn back SQL Stats flush. If the flush is aborted due to violating + // minimum flush interval constraint, we should not be clearing in-memory + // stats. + sqlConn.Exec(t, + "SET CLUSTER SETTING sql.stats.flush.enabled = true") + sqlConn.Exec(t, "SET application_name = 'flush_enabled_test'") + sqlConn.Exec(t, "SELECT 1") + + observerConn.CheckQueryResults(t, ` + SELECT count(*) + FROM crdb_internal.statement_statistics + WHERE app_name = 'flush_enabled_test' + `, [][]string{{"1"}}) + + observerConn.CheckQueryResults(t, ` + SELECT count(*) + FROM crdb_internal.transaction_statistics + WHERE app_name = 'flush_enabled_test' + `, [][]string{{"1"}}) + + // First flush should flush everything into the system tables. + s.SQLServer().(*sql.Server). + GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx) + + observerConn.CheckQueryResults(t, ` + SELECT count(*) + FROM system.statement_statistics + WHERE app_name = 'flush_enabled_test' + `, [][]string{{"1"}}) + + observerConn.CheckQueryResults(t, ` + SELECT count(*) + FROM system.transaction_statistics + WHERE app_name = 'flush_enabled_test' + `, [][]string{{"1"}}) + + sqlConn.Exec(t, "SELECT 1,1") + + // Second flush should be aborted due to violating the minimum flush + // interval requirement. Though the data should still remain in-memory. + s.SQLServer().(*sql.Server). + GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx) + + observerConn.CheckQueryResults(t, ` + SELECT count(*) + FROM system.statement_statistics + WHERE app_name = 'flush_enabled_test' + `, [][]string{{"1"}}) + + observerConn.CheckQueryResults(t, ` + SELECT count(*) + FROM system.transaction_statistics + WHERE app_name = 'flush_enabled_test' + `, [][]string{{"1"}}) + + observerConn.CheckQueryResults(t, ` + SELECT count(*) + FROM crdb_internal.statement_statistics + WHERE app_name = 'flush_enabled_test' + `, [][]string{{"2"}}) + + observerConn.CheckQueryResults(t, ` + SELECT count(*) + FROM crdb_internal.transaction_statistics + WHERE app_name = 'flush_enabled_test' + `, [][]string{{"2"}}) + }) +} + type stubTime struct { syncutil.RWMutex t time.Time diff --git a/pkg/sql/sqlstats/persistedsqlstats/provider.go b/pkg/sql/sqlstats/persistedsqlstats/provider.go index 2bc86ebc73b4..81fe24a48a88 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/provider.go +++ b/pkg/sql/sqlstats/persistedsqlstats/provider.go @@ -149,10 +149,7 @@ func (s *PersistedSQLStats) startSQLStatsFlushLoop(ctx context.Context, stopper return } - enabled := SQLStatsFlushEnabled.Get(&s.cfg.Settings.SV) - if enabled { - s.Flush(ctx) - } + s.Flush(ctx) } }) } diff --git a/pkg/sql/sqlstats/persistedsqlstats/testdata/stmt_grouping_in_explicit_txn b/pkg/sql/sqlstats/persistedsqlstats/testdata/stmt_grouping_in_explicit_txn index 834fe91b184c..321e190378e8 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/testdata/stmt_grouping_in_explicit_txn +++ b/pkg/sql/sqlstats/persistedsqlstats/testdata/stmt_grouping_in_explicit_txn @@ -170,6 +170,9 @@ WHERE WHERE jsonb_array_length(metadata -> 'stmtFingerprintIDs') >= 3 ) +ORDER BY + (statistics -> 'statistics' ->> 'cnt')::INT +ASC ---- -df3c70bf7729b433,705fcdf3f12803ec,SELECT _,2 15c74af7a18bd6da,baa4f7bb278a6105,SELECT _, _,1 +df3c70bf7729b433,705fcdf3f12803ec,SELECT _,2