diff --git a/pkg/sql/sqlstats/persistedsqlstats/cluster_settings.go b/pkg/sql/sqlstats/persistedsqlstats/cluster_settings.go index be17f35183ab..07547ee5f747 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/cluster_settings.go +++ b/pkg/sql/sqlstats/persistedsqlstats/cluster_settings.go @@ -29,6 +29,32 @@ var SQLStatsFlushInterval = settings.RegisterDurationSetting( settings.NonNegativeDurationWithMaximum(time.Hour*24), ).WithPublic() +// MinimumFlushInterval is the cluster setting that controls the minimum +// between each flush operation. If flush operations get triggered faster +// than what is allowed by this setting, (e.g. when too many fingerprints are +// generated in a short span of time, which in turn cause memory pressure), the +// flush operation will be aborted. +var MinimumFlushInterval = settings.RegisterDurationSetting( + settings.TenantWritable, + "sql.stats.flush.minimum_interval", + "the minimum interval that SQL stats can be flushes to disk. If a "+ + "flush operation starts within less than the minimum interval, the flush "+ + "operation will be aborted", + time.Minute*10, + settings.NonNegativeDuration, +) + +// DiscardInMemoryStatsWhenFlushDisabled is the cluster setting that allows the +// older in-memory SQL stats to be discarded when flushing to persisted tables +// is disabled. +var DiscardInMemoryStatsWhenFlushDisabled = settings.RegisterBoolSetting( + settings.TenantWritable, + "sql.stats.flush.discard_in_memory_stats_when_flush_disabled.enabled", + "if set, older SQL stats are discarded periodically when flushing to "+ + "persisted tables is disabled", + false, +) + // SQLStatsFlushEnabled is the cluster setting that controls if the sqlstats // subsystem persists the statistics into system table. var SQLStatsFlushEnabled = settings.RegisterBoolSetting( diff --git a/pkg/sql/sqlstats/persistedsqlstats/flush.go b/pkg/sql/sqlstats/persistedsqlstats/flush.go index 4d290242b6a4..f81a7b966168 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/flush.go +++ b/pkg/sql/sqlstats/persistedsqlstats/flush.go @@ -29,18 +29,49 @@ import ( // Flush flushes in-memory sql stats into system table. Any errors encountered // during the flush will be logged as warning. func (s *PersistedSQLStats) Flush(ctx context.Context) { + now := s.getTimeNow() + + allowDiscardWhenDisabled := DiscardInMemoryStatsWhenFlushDisabled.Get(&s.cfg.Settings.SV) + minimumFlushInterval := MinimumFlushInterval.Get(&s.cfg.Settings.SV) + + enabled := SQLStatsFlushEnabled.Get(&s.cfg.Settings.SV) + flushingTooSoon := now.Before(s.lastFlushStarted.Add(minimumFlushInterval)) + + // Handle wiping in-memory stats here, we only wipe in-memory stats under 2 + // circumstances: + // 1. flush is enabled, and we are not early aborting the flush due to flushing + // too frequently. + // 2. flush is disabled, but we allow discard in-memory stats when disabled. + shouldWipeInMemoryStats := enabled && !flushingTooSoon + shouldWipeInMemoryStats = shouldWipeInMemoryStats || (!enabled && allowDiscardWhenDisabled) + + if shouldWipeInMemoryStats { + defer func() { + if err := s.SQLStats.Reset(ctx); err != nil { + log.Warningf(ctx, "fail to reset in-memory SQL Stats: %s", err) + } + }() + } + + // Handle early abortion of the flush. + if !enabled { + return + } + + if flushingTooSoon { + log.Infof(ctx, "flush aborted due to high flush frequency. "+ + "The minimum interval between flushes is %s", minimumFlushInterval.String()) + return + } + + s.lastFlushStarted = now log.Infof(ctx, "flushing %d stmt/txn fingerprints (%d bytes) after %s", s.SQLStats.GetTotalFingerprintCount(), s.SQLStats.GetTotalFingerprintBytes(), timeutil.Since(s.lastFlushStarted)) aggregatedTs := s.ComputeAggregatedTs() - s.lastFlushStarted = s.getTimeNow() s.flushStmtStats(ctx, aggregatedTs) s.flushTxnStats(ctx, aggregatedTs) - - if err := s.SQLStats.Reset(ctx); err != nil { - log.Warningf(ctx, "fail to reset in-memory SQL Stats: %s", err) - } } func (s *PersistedSQLStats) flushStmtStats(ctx context.Context, aggregatedTs time.Time) { diff --git a/pkg/sql/sqlstats/persistedsqlstats/flush_test.go b/pkg/sql/sqlstats/persistedsqlstats/flush_test.go index 0390cc355702..c553d4c52d5c 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/flush_test.go +++ b/pkg/sql/sqlstats/persistedsqlstats/flush_test.go @@ -102,6 +102,7 @@ func TestSQLStatsFlush(t *testing.T) { pgSecondSQLConn, err := gosql.Open("postgres", secondPgURL.String()) require.NoError(t, err) secondSQLConn := sqlutils.MakeSQLRunner(pgSecondSQLConn) + secondSQLConn.Exec(t, "SET CLUSTER SETTING sql.stats.flush.minimum_interval = '0s'") firstServerSQLStats := firstServer.SQLServer().(*sql.Server).GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats) secondServerSQLStats := secondServer.SQLServer().(*sql.Server).GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats) @@ -243,6 +244,186 @@ func TestSQLStatsInitialDelay(t *testing.T) { "expected latest nextFlushAt to be %s, but found %s", maxNextRunAt, initialNextFlushAt) } +func TestSQLStatsMinimumFlushInterval(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + fakeTime := stubTime{ + aggInterval: time.Hour, + } + fakeTime.setTime(timeutil.Now()) + + params, _ := tests.CreateTestServerParams() + params.Knobs.SQLStatsKnobs = &sqlstats.TestingKnobs{ + StubTimeNow: fakeTime.Now, + } + s, conn, _ := serverutils.StartServer(t, params) + + defer s.Stopper().Stop(context.Background()) + + sqlConn := sqlutils.MakeSQLRunner(conn) + + sqlConn.Exec(t, "SET application_name = 'min_flush_test'") + sqlConn.Exec(t, "SELECT 1") + + s.SQLServer().(*sql.Server). + GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx) + + sqlConn.CheckQueryResults(t, ` + SELECT count(*) + FROM system.statement_statistics + WHERE app_name = 'min_flush_test' + `, [][]string{{"1"}}) + + sqlConn.CheckQueryResults(t, ` + SELECT count(*) + FROM system.transaction_statistics + WHERE app_name = 'min_flush_test' + `, [][]string{{"1"}}) + + // Since by default, the minimum flush interval is 10 minutes, a subsequent + // flush should be no-op. + s.SQLServer().(*sql.Server). + GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx) + + sqlConn.CheckQueryResults(t, ` + SELECT count(*) + FROM system.statement_statistics + WHERE app_name = 'min_flush_test' + `, [][]string{{"1"}}) + + sqlConn.CheckQueryResults(t, ` + SELECT count(*) + FROM system.transaction_statistics + WHERE app_name = 'min_flush_test' + `, [][]string{{"1"}}) + + // We manually set the time to past the minimum flush interval, now the flush + // should succeed. + fakeTime.setTime(fakeTime.Now().Add(time.Hour)) + + s.SQLServer().(*sql.Server). + GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx) + + sqlConn.CheckQueryResults(t, ` + SELECT count(*) > 1 + FROM system.statement_statistics + WHERE app_name = 'min_flush_test' + `, [][]string{{"true"}}) + +} + +func TestInMemoryStatsDiscard(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + params, _ := tests.CreateTestServerParams() + s, conn, _ := serverutils.StartServer(t, params) + observer := + serverutils.OpenDBConn(t, s.ServingSQLAddr(), "", false /* insecure */, s.Stopper()) + + defer s.Stopper().Stop(context.Background()) + + sqlConn := sqlutils.MakeSQLRunner(conn) + observerConn := sqlutils.MakeSQLRunner(observer) + + t.Run("flush_disabled", func(t *testing.T) { + sqlConn.Exec(t, + "SET CLUSTER SETTING sql.stats.flush.discard_in_memory_stats_when_flush_disabled.enabled = true") + sqlConn.Exec(t, + "SET CLUSTER SETTING sql.stats.flush.enabled = false") + + sqlConn.Exec(t, "SET application_name = 'flush_disabled_test'") + sqlConn.Exec(t, "SELECT 1") + + observerConn.CheckQueryResults(t, ` + SELECT count(*) + FROM crdb_internal.statement_statistics + WHERE app_name = 'flush_disabled_test' + `, [][]string{{"1"}}) + + s.SQLServer().(*sql.Server). + GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx) + + observerConn.CheckQueryResults(t, ` + SELECT count(*) + FROM crdb_internal.statement_statistics + WHERE app_name = 'flush_disabled_test' + `, [][]string{{"0"}}) + }) + + t.Run("flush_enabled", func(t *testing.T) { + // Now turn back SQL Stats flush. If the flush is aborted due to violating + // minimum flush interval constraint, we should not be clearing in-memory + // stats. + sqlConn.Exec(t, + "SET CLUSTER SETTING sql.stats.flush.enabled = true") + sqlConn.Exec(t, "SET application_name = 'flush_enabled_test'") + sqlConn.Exec(t, "SELECT 1") + + observerConn.CheckQueryResults(t, ` + SELECT count(*) + FROM crdb_internal.statement_statistics + WHERE app_name = 'flush_enabled_test' + `, [][]string{{"1"}}) + + observerConn.CheckQueryResults(t, ` + SELECT count(*) + FROM crdb_internal.transaction_statistics + WHERE app_name = 'flush_enabled_test' + `, [][]string{{"1"}}) + + // First flush should flush everything into the system tables. + s.SQLServer().(*sql.Server). + GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx) + + observerConn.CheckQueryResults(t, ` + SELECT count(*) + FROM system.statement_statistics + WHERE app_name = 'flush_enabled_test' + `, [][]string{{"1"}}) + + observerConn.CheckQueryResults(t, ` + SELECT count(*) + FROM system.transaction_statistics + WHERE app_name = 'flush_enabled_test' + `, [][]string{{"1"}}) + + sqlConn.Exec(t, "SELECT 1,1") + + // Second flush should be aborted due to violating the minimum flush + // interval requirement. Though the data should still remain in-memory. + s.SQLServer().(*sql.Server). + GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx) + + observerConn.CheckQueryResults(t, ` + SELECT count(*) + FROM system.statement_statistics + WHERE app_name = 'flush_enabled_test' + `, [][]string{{"1"}}) + + observerConn.CheckQueryResults(t, ` + SELECT count(*) + FROM system.transaction_statistics + WHERE app_name = 'flush_enabled_test' + `, [][]string{{"1"}}) + + observerConn.CheckQueryResults(t, ` + SELECT count(*) + FROM crdb_internal.statement_statistics + WHERE app_name = 'flush_enabled_test' + `, [][]string{{"2"}}) + + observerConn.CheckQueryResults(t, ` + SELECT count(*) + FROM crdb_internal.transaction_statistics + WHERE app_name = 'flush_enabled_test' + `, [][]string{{"2"}}) + }) +} + type stubTime struct { syncutil.RWMutex t time.Time diff --git a/pkg/sql/sqlstats/persistedsqlstats/provider.go b/pkg/sql/sqlstats/persistedsqlstats/provider.go index 3327c23902e4..655940ce9e30 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/provider.go +++ b/pkg/sql/sqlstats/persistedsqlstats/provider.go @@ -146,10 +146,7 @@ func (s *PersistedSQLStats) startSQLStatsFlushLoop(ctx context.Context, stopper return } - enabled := SQLStatsFlushEnabled.Get(&s.cfg.Settings.SV) - if enabled { - s.Flush(ctx) - } + s.Flush(ctx) } }) } diff --git a/pkg/sql/sqlstats/persistedsqlstats/testdata/consistent_flush_timestamp b/pkg/sql/sqlstats/persistedsqlstats/testdata/consistent_flush_timestamp index ba6897be35ca..8b00d73ce135 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/testdata/consistent_flush_timestamp +++ b/pkg/sql/sqlstats/persistedsqlstats/testdata/consistent_flush_timestamp @@ -1,3 +1,7 @@ +exec-sql +SET CLUSTER SETTING sql.stats.flush.minimum_interval = '0s' +---- + exec-sql SET application_name = 'consistent-test' ---- diff --git a/pkg/sql/sqlstats/persistedsqlstats/testdata/stmt_grouping_in_explicit_txn b/pkg/sql/sqlstats/persistedsqlstats/testdata/stmt_grouping_in_explicit_txn index 834fe91b184c..8bdadaa0e32d 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/testdata/stmt_grouping_in_explicit_txn +++ b/pkg/sql/sqlstats/persistedsqlstats/testdata/stmt_grouping_in_explicit_txn @@ -1,3 +1,7 @@ +exec-sql +SET CLUSTER SETTING sql.stats.flush.minimum_interval = '0s' +---- + exec-sql SET application_name = 'test' ---- @@ -170,6 +174,9 @@ WHERE WHERE jsonb_array_length(metadata -> 'stmtFingerprintIDs') >= 3 ) +ORDER BY + (statistics -> 'statistics' ->> 'cnt')::INT +ASC ---- -df3c70bf7729b433,705fcdf3f12803ec,SELECT _,2 15c74af7a18bd6da,baa4f7bb278a6105,SELECT _, _,1 +df3c70bf7729b433,705fcdf3f12803ec,SELECT _,2