From b8d98548caa6f93821ac2b7c97be1812ff2a64e8 Mon Sep 17 00:00:00 2001 From: Azhng Date: Wed, 23 Mar 2022 21:24:13 +0000 Subject: [PATCH] sql: new SQL Stats cluster settings to improve write traffic Resolves #78339 Previously, SQL Stats flushed to system table as soon as the in-memory buffer is full. This means the size of the system tables that back SQL Stats could grow faster than the cleanup job. Additionally, when the SQL Stats flush is disabled, the SQL Stats is unable to collect any more new statement / transaction statistics when the in-memory store is full. This commit introduces two non-public cluster settings: * `sql.stats.flush.minimum_interval`: this setting limits minimum interval between each flush operation. If a flush operation is triggered sooner than what is allowed by the minimum interval, (e.g. when the in-memory SQL Stats store is full), the flush operation is aborted. * `sql.stats.flush.periodically_discard_when_disabled`: which allows the in-memory SQL Stats to be cleared at the interval specified by `sql.stats.flush.interval`, even if the SQL Stats flush is disabled. Release note: None --- .../persistedsqlstats/cluster_settings.go | 26 +++ pkg/sql/sqlstats/persistedsqlstats/flush.go | 41 +++- .../sqlstats/persistedsqlstats/flush_test.go | 181 ++++++++++++++++++ .../sqlstats/persistedsqlstats/provider.go | 5 +- .../testdata/consistent_flush_timestamp | 4 + .../testdata/stmt_grouping_in_explicit_txn | 9 +- 6 files changed, 256 insertions(+), 10 deletions(-) diff --git a/pkg/sql/sqlstats/persistedsqlstats/cluster_settings.go b/pkg/sql/sqlstats/persistedsqlstats/cluster_settings.go index be17f35183ab..07547ee5f747 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/cluster_settings.go +++ b/pkg/sql/sqlstats/persistedsqlstats/cluster_settings.go @@ -29,6 +29,32 @@ var SQLStatsFlushInterval = settings.RegisterDurationSetting( settings.NonNegativeDurationWithMaximum(time.Hour*24), ).WithPublic() +// MinimumFlushInterval is the cluster setting that controls the minimum +// between each flush operation. If flush operations get triggered faster +// than what is allowed by this setting, (e.g. when too many fingerprints are +// generated in a short span of time, which in turn cause memory pressure), the +// flush operation will be aborted. +var MinimumFlushInterval = settings.RegisterDurationSetting( + settings.TenantWritable, + "sql.stats.flush.minimum_interval", + "the minimum interval that SQL stats can be flushes to disk. If a "+ + "flush operation starts within less than the minimum interval, the flush "+ + "operation will be aborted", + time.Minute*10, + settings.NonNegativeDuration, +) + +// DiscardInMemoryStatsWhenFlushDisabled is the cluster setting that allows the +// older in-memory SQL stats to be discarded when flushing to persisted tables +// is disabled. +var DiscardInMemoryStatsWhenFlushDisabled = settings.RegisterBoolSetting( + settings.TenantWritable, + "sql.stats.flush.discard_in_memory_stats_when_flush_disabled.enabled", + "if set, older SQL stats are discarded periodically when flushing to "+ + "persisted tables is disabled", + false, +) + // SQLStatsFlushEnabled is the cluster setting that controls if the sqlstats // subsystem persists the statistics into system table. var SQLStatsFlushEnabled = settings.RegisterBoolSetting( diff --git a/pkg/sql/sqlstats/persistedsqlstats/flush.go b/pkg/sql/sqlstats/persistedsqlstats/flush.go index 4d290242b6a4..f81a7b966168 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/flush.go +++ b/pkg/sql/sqlstats/persistedsqlstats/flush.go @@ -29,18 +29,49 @@ import ( // Flush flushes in-memory sql stats into system table. Any errors encountered // during the flush will be logged as warning. func (s *PersistedSQLStats) Flush(ctx context.Context) { + now := s.getTimeNow() + + allowDiscardWhenDisabled := DiscardInMemoryStatsWhenFlushDisabled.Get(&s.cfg.Settings.SV) + minimumFlushInterval := MinimumFlushInterval.Get(&s.cfg.Settings.SV) + + enabled := SQLStatsFlushEnabled.Get(&s.cfg.Settings.SV) + flushingTooSoon := now.Before(s.lastFlushStarted.Add(minimumFlushInterval)) + + // Handle wiping in-memory stats here, we only wipe in-memory stats under 2 + // circumstances: + // 1. flush is enabled, and we are not early aborting the flush due to flushing + // too frequently. + // 2. flush is disabled, but we allow discard in-memory stats when disabled. + shouldWipeInMemoryStats := enabled && !flushingTooSoon + shouldWipeInMemoryStats = shouldWipeInMemoryStats || (!enabled && allowDiscardWhenDisabled) + + if shouldWipeInMemoryStats { + defer func() { + if err := s.SQLStats.Reset(ctx); err != nil { + log.Warningf(ctx, "fail to reset in-memory SQL Stats: %s", err) + } + }() + } + + // Handle early abortion of the flush. + if !enabled { + return + } + + if flushingTooSoon { + log.Infof(ctx, "flush aborted due to high flush frequency. "+ + "The minimum interval between flushes is %s", minimumFlushInterval.String()) + return + } + + s.lastFlushStarted = now log.Infof(ctx, "flushing %d stmt/txn fingerprints (%d bytes) after %s", s.SQLStats.GetTotalFingerprintCount(), s.SQLStats.GetTotalFingerprintBytes(), timeutil.Since(s.lastFlushStarted)) aggregatedTs := s.ComputeAggregatedTs() - s.lastFlushStarted = s.getTimeNow() s.flushStmtStats(ctx, aggregatedTs) s.flushTxnStats(ctx, aggregatedTs) - - if err := s.SQLStats.Reset(ctx); err != nil { - log.Warningf(ctx, "fail to reset in-memory SQL Stats: %s", err) - } } func (s *PersistedSQLStats) flushStmtStats(ctx context.Context, aggregatedTs time.Time) { diff --git a/pkg/sql/sqlstats/persistedsqlstats/flush_test.go b/pkg/sql/sqlstats/persistedsqlstats/flush_test.go index 0390cc355702..c553d4c52d5c 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/flush_test.go +++ b/pkg/sql/sqlstats/persistedsqlstats/flush_test.go @@ -102,6 +102,7 @@ func TestSQLStatsFlush(t *testing.T) { pgSecondSQLConn, err := gosql.Open("postgres", secondPgURL.String()) require.NoError(t, err) secondSQLConn := sqlutils.MakeSQLRunner(pgSecondSQLConn) + secondSQLConn.Exec(t, "SET CLUSTER SETTING sql.stats.flush.minimum_interval = '0s'") firstServerSQLStats := firstServer.SQLServer().(*sql.Server).GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats) secondServerSQLStats := secondServer.SQLServer().(*sql.Server).GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats) @@ -243,6 +244,186 @@ func TestSQLStatsInitialDelay(t *testing.T) { "expected latest nextFlushAt to be %s, but found %s", maxNextRunAt, initialNextFlushAt) } +func TestSQLStatsMinimumFlushInterval(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + fakeTime := stubTime{ + aggInterval: time.Hour, + } + fakeTime.setTime(timeutil.Now()) + + params, _ := tests.CreateTestServerParams() + params.Knobs.SQLStatsKnobs = &sqlstats.TestingKnobs{ + StubTimeNow: fakeTime.Now, + } + s, conn, _ := serverutils.StartServer(t, params) + + defer s.Stopper().Stop(context.Background()) + + sqlConn := sqlutils.MakeSQLRunner(conn) + + sqlConn.Exec(t, "SET application_name = 'min_flush_test'") + sqlConn.Exec(t, "SELECT 1") + + s.SQLServer().(*sql.Server). + GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx) + + sqlConn.CheckQueryResults(t, ` + SELECT count(*) + FROM system.statement_statistics + WHERE app_name = 'min_flush_test' + `, [][]string{{"1"}}) + + sqlConn.CheckQueryResults(t, ` + SELECT count(*) + FROM system.transaction_statistics + WHERE app_name = 'min_flush_test' + `, [][]string{{"1"}}) + + // Since by default, the minimum flush interval is 10 minutes, a subsequent + // flush should be no-op. + s.SQLServer().(*sql.Server). + GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx) + + sqlConn.CheckQueryResults(t, ` + SELECT count(*) + FROM system.statement_statistics + WHERE app_name = 'min_flush_test' + `, [][]string{{"1"}}) + + sqlConn.CheckQueryResults(t, ` + SELECT count(*) + FROM system.transaction_statistics + WHERE app_name = 'min_flush_test' + `, [][]string{{"1"}}) + + // We manually set the time to past the minimum flush interval, now the flush + // should succeed. + fakeTime.setTime(fakeTime.Now().Add(time.Hour)) + + s.SQLServer().(*sql.Server). + GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx) + + sqlConn.CheckQueryResults(t, ` + SELECT count(*) > 1 + FROM system.statement_statistics + WHERE app_name = 'min_flush_test' + `, [][]string{{"true"}}) + +} + +func TestInMemoryStatsDiscard(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + params, _ := tests.CreateTestServerParams() + s, conn, _ := serverutils.StartServer(t, params) + observer := + serverutils.OpenDBConn(t, s.ServingSQLAddr(), "", false /* insecure */, s.Stopper()) + + defer s.Stopper().Stop(context.Background()) + + sqlConn := sqlutils.MakeSQLRunner(conn) + observerConn := sqlutils.MakeSQLRunner(observer) + + t.Run("flush_disabled", func(t *testing.T) { + sqlConn.Exec(t, + "SET CLUSTER SETTING sql.stats.flush.discard_in_memory_stats_when_flush_disabled.enabled = true") + sqlConn.Exec(t, + "SET CLUSTER SETTING sql.stats.flush.enabled = false") + + sqlConn.Exec(t, "SET application_name = 'flush_disabled_test'") + sqlConn.Exec(t, "SELECT 1") + + observerConn.CheckQueryResults(t, ` + SELECT count(*) + FROM crdb_internal.statement_statistics + WHERE app_name = 'flush_disabled_test' + `, [][]string{{"1"}}) + + s.SQLServer().(*sql.Server). + GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx) + + observerConn.CheckQueryResults(t, ` + SELECT count(*) + FROM crdb_internal.statement_statistics + WHERE app_name = 'flush_disabled_test' + `, [][]string{{"0"}}) + }) + + t.Run("flush_enabled", func(t *testing.T) { + // Now turn back SQL Stats flush. If the flush is aborted due to violating + // minimum flush interval constraint, we should not be clearing in-memory + // stats. + sqlConn.Exec(t, + "SET CLUSTER SETTING sql.stats.flush.enabled = true") + sqlConn.Exec(t, "SET application_name = 'flush_enabled_test'") + sqlConn.Exec(t, "SELECT 1") + + observerConn.CheckQueryResults(t, ` + SELECT count(*) + FROM crdb_internal.statement_statistics + WHERE app_name = 'flush_enabled_test' + `, [][]string{{"1"}}) + + observerConn.CheckQueryResults(t, ` + SELECT count(*) + FROM crdb_internal.transaction_statistics + WHERE app_name = 'flush_enabled_test' + `, [][]string{{"1"}}) + + // First flush should flush everything into the system tables. + s.SQLServer().(*sql.Server). + GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx) + + observerConn.CheckQueryResults(t, ` + SELECT count(*) + FROM system.statement_statistics + WHERE app_name = 'flush_enabled_test' + `, [][]string{{"1"}}) + + observerConn.CheckQueryResults(t, ` + SELECT count(*) + FROM system.transaction_statistics + WHERE app_name = 'flush_enabled_test' + `, [][]string{{"1"}}) + + sqlConn.Exec(t, "SELECT 1,1") + + // Second flush should be aborted due to violating the minimum flush + // interval requirement. Though the data should still remain in-memory. + s.SQLServer().(*sql.Server). + GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx) + + observerConn.CheckQueryResults(t, ` + SELECT count(*) + FROM system.statement_statistics + WHERE app_name = 'flush_enabled_test' + `, [][]string{{"1"}}) + + observerConn.CheckQueryResults(t, ` + SELECT count(*) + FROM system.transaction_statistics + WHERE app_name = 'flush_enabled_test' + `, [][]string{{"1"}}) + + observerConn.CheckQueryResults(t, ` + SELECT count(*) + FROM crdb_internal.statement_statistics + WHERE app_name = 'flush_enabled_test' + `, [][]string{{"2"}}) + + observerConn.CheckQueryResults(t, ` + SELECT count(*) + FROM crdb_internal.transaction_statistics + WHERE app_name = 'flush_enabled_test' + `, [][]string{{"2"}}) + }) +} + type stubTime struct { syncutil.RWMutex t time.Time diff --git a/pkg/sql/sqlstats/persistedsqlstats/provider.go b/pkg/sql/sqlstats/persistedsqlstats/provider.go index 3327c23902e4..655940ce9e30 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/provider.go +++ b/pkg/sql/sqlstats/persistedsqlstats/provider.go @@ -146,10 +146,7 @@ func (s *PersistedSQLStats) startSQLStatsFlushLoop(ctx context.Context, stopper return } - enabled := SQLStatsFlushEnabled.Get(&s.cfg.Settings.SV) - if enabled { - s.Flush(ctx) - } + s.Flush(ctx) } }) } diff --git a/pkg/sql/sqlstats/persistedsqlstats/testdata/consistent_flush_timestamp b/pkg/sql/sqlstats/persistedsqlstats/testdata/consistent_flush_timestamp index ba6897be35ca..8b00d73ce135 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/testdata/consistent_flush_timestamp +++ b/pkg/sql/sqlstats/persistedsqlstats/testdata/consistent_flush_timestamp @@ -1,3 +1,7 @@ +exec-sql +SET CLUSTER SETTING sql.stats.flush.minimum_interval = '0s' +---- + exec-sql SET application_name = 'consistent-test' ---- diff --git a/pkg/sql/sqlstats/persistedsqlstats/testdata/stmt_grouping_in_explicit_txn b/pkg/sql/sqlstats/persistedsqlstats/testdata/stmt_grouping_in_explicit_txn index 834fe91b184c..8bdadaa0e32d 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/testdata/stmt_grouping_in_explicit_txn +++ b/pkg/sql/sqlstats/persistedsqlstats/testdata/stmt_grouping_in_explicit_txn @@ -1,3 +1,7 @@ +exec-sql +SET CLUSTER SETTING sql.stats.flush.minimum_interval = '0s' +---- + exec-sql SET application_name = 'test' ---- @@ -170,6 +174,9 @@ WHERE WHERE jsonb_array_length(metadata -> 'stmtFingerprintIDs') >= 3 ) +ORDER BY + (statistics -> 'statistics' ->> 'cnt')::INT +ASC ---- -df3c70bf7729b433,705fcdf3f12803ec,SELECT _,2 15c74af7a18bd6da,baa4f7bb278a6105,SELECT _, _,1 +df3c70bf7729b433,705fcdf3f12803ec,SELECT _,2