From 7c56ef00429725af5f9531cd19efc66fe7689e5a Mon Sep 17 00:00:00 2001 From: Azhng Date: Wed, 23 Mar 2022 21:24:13 +0000 Subject: [PATCH] sql: new SQL Stats cluster settings to improve write traffic Resolves #78339 Previously, SQL Stats flushed to system table as soon as the in-memory buffer is full. This means the size of the system tables that back SQL Stats could grow faster than the cleanup job. Additionally, when the SQL Stats flush is disabled, the SQL Stats is unable to collect any more new statement / transaction statistics when the in-memory store is full. This commit introduces two non-public cluster settings: * `sql.stats.flush.minimum_interval`: this setting limits minimum interval between each flush operation. If a flush operation is triggered sooner than what is allowed by the minimum interval, (e.g. when the in-memory SQL Stats store is full), the flush operation is aborted. By default this cluster setting is set to 0. * `sql.stats.flush.force_cleanup.enabled`: which allows the in-memory SQL Stats to be cleared at the interval specified by `sql.stats.flush.interval`, even if the SQL Stats flush is disabled. By default, this cluster setting is set to false. This commit also updated the stmt_grouping_in_explicit_txn data driven test to ensure the output order is deterministic. Release note: None --- .../persistedsqlstats/cluster_settings.go | 26 +++ pkg/sql/sqlstats/persistedsqlstats/flush.go | 41 +++- .../sqlstats/persistedsqlstats/flush_test.go | 183 ++++++++++++++++++ .../sqlstats/persistedsqlstats/provider.go | 5 +- .../testdata/stmt_grouping_in_explicit_txn | 5 +- 5 files changed, 250 insertions(+), 10 deletions(-) diff --git a/pkg/sql/sqlstats/persistedsqlstats/cluster_settings.go b/pkg/sql/sqlstats/persistedsqlstats/cluster_settings.go index be17f35183ab..c77119db607b 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/cluster_settings.go +++ b/pkg/sql/sqlstats/persistedsqlstats/cluster_settings.go @@ -29,6 +29,32 @@ var SQLStatsFlushInterval = settings.RegisterDurationSetting( settings.NonNegativeDurationWithMaximum(time.Hour*24), ).WithPublic() +// MinimumInterval is the cluster setting that controls the minimum interval +// between each flush operation. If flush operations get triggered faster +// than what is allowed by this setting, (e.g. when too many fingerprints are +// generated in a short span of time, which in turn cause memory pressure), the +// flush operation will be aborted. +var MinimumInterval = settings.RegisterDurationSetting( + settings.TenantWritable, + "sql.stats.flush.minimum_interval", + "the minimum interval that SQL stats can be flushes to disk. If a "+ + "flush operation starts within less than the minimum interval, the flush "+ + "operation will be aborted", + 0, + settings.NonNegativeDuration, +) + +// DiscardInMemoryStatsWhenFlushDisabled is the cluster setting that allows the +// older in-memory SQL stats to be discarded when flushing to persisted tables +// is disabled. +var DiscardInMemoryStatsWhenFlushDisabled = settings.RegisterBoolSetting( + settings.TenantWritable, + "sql.stats.flush.force_cleanup.enabled", + "if set, older SQL stats are discarded periodically when flushing to "+ + "persisted tables is disabled", + false, +) + // SQLStatsFlushEnabled is the cluster setting that controls if the sqlstats // subsystem persists the statistics into system table. var SQLStatsFlushEnabled = settings.RegisterBoolSetting( diff --git a/pkg/sql/sqlstats/persistedsqlstats/flush.go b/pkg/sql/sqlstats/persistedsqlstats/flush.go index 4d290242b6a4..110f01fc6bbd 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/flush.go +++ b/pkg/sql/sqlstats/persistedsqlstats/flush.go @@ -29,18 +29,49 @@ import ( // Flush flushes in-memory sql stats into system table. Any errors encountered // during the flush will be logged as warning. func (s *PersistedSQLStats) Flush(ctx context.Context) { + now := s.getTimeNow() + + allowDiscardWhenDisabled := DiscardInMemoryStatsWhenFlushDisabled.Get(&s.cfg.Settings.SV) + minimumFlushInterval := MinimumInterval.Get(&s.cfg.Settings.SV) + + enabled := SQLStatsFlushEnabled.Get(&s.cfg.Settings.SV) + flushingTooSoon := now.Before(s.lastFlushStarted.Add(minimumFlushInterval)) + + // Handle wiping in-memory stats here, we only wipe in-memory stats under 2 + // circumstances: + // 1. flush is enabled, and we are not early aborting the flush due to flushing + // too frequently. + // 2. flush is disabled, but we allow discard in-memory stats when disabled. + shouldWipeInMemoryStats := enabled && !flushingTooSoon + shouldWipeInMemoryStats = shouldWipeInMemoryStats || (!enabled && allowDiscardWhenDisabled) + + if shouldWipeInMemoryStats { + defer func() { + if err := s.SQLStats.Reset(ctx); err != nil { + log.Warningf(ctx, "fail to reset in-memory SQL Stats: %s", err) + } + }() + } + + // Handle early abortion of the flush. + if !enabled { + return + } + + if flushingTooSoon { + log.Infof(ctx, "flush aborted due to high flush frequency. "+ + "The minimum interval between flushes is %s", minimumFlushInterval.String()) + return + } + + s.lastFlushStarted = now log.Infof(ctx, "flushing %d stmt/txn fingerprints (%d bytes) after %s", s.SQLStats.GetTotalFingerprintCount(), s.SQLStats.GetTotalFingerprintBytes(), timeutil.Since(s.lastFlushStarted)) aggregatedTs := s.ComputeAggregatedTs() - s.lastFlushStarted = s.getTimeNow() s.flushStmtStats(ctx, aggregatedTs) s.flushTxnStats(ctx, aggregatedTs) - - if err := s.SQLStats.Reset(ctx); err != nil { - log.Warningf(ctx, "fail to reset in-memory SQL Stats: %s", err) - } } func (s *PersistedSQLStats) flushStmtStats(ctx context.Context, aggregatedTs time.Time) { diff --git a/pkg/sql/sqlstats/persistedsqlstats/flush_test.go b/pkg/sql/sqlstats/persistedsqlstats/flush_test.go index 0390cc355702..c2d806e2fc86 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/flush_test.go +++ b/pkg/sql/sqlstats/persistedsqlstats/flush_test.go @@ -243,6 +243,189 @@ func TestSQLStatsInitialDelay(t *testing.T) { "expected latest nextFlushAt to be %s, but found %s", maxNextRunAt, initialNextFlushAt) } +func TestSQLStatsMinimumFlushInterval(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + fakeTime := stubTime{ + aggInterval: time.Hour, + } + fakeTime.setTime(timeutil.Now()) + + params, _ := tests.CreateTestServerParams() + params.Knobs.SQLStatsKnobs = &sqlstats.TestingKnobs{ + StubTimeNow: fakeTime.Now, + } + s, conn, _ := serverutils.StartServer(t, params) + + defer s.Stopper().Stop(context.Background()) + + sqlConn := sqlutils.MakeSQLRunner(conn) + + sqlConn.Exec(t, "SET CLUSTER SETTING sql.stats.flush.minimum_interval = '10m'") + sqlConn.Exec(t, "SET application_name = 'min_flush_test'") + sqlConn.Exec(t, "SELECT 1") + + s.SQLServer().(*sql.Server). + GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx) + + sqlConn.CheckQueryResults(t, ` + SELECT count(*) + FROM system.statement_statistics + WHERE app_name = 'min_flush_test' + `, [][]string{{"1"}}) + + sqlConn.CheckQueryResults(t, ` + SELECT count(*) + FROM system.transaction_statistics + WHERE app_name = 'min_flush_test' + `, [][]string{{"1"}}) + + // Since by default, the minimum flush interval is 10 minutes, a subsequent + // flush should be no-op. + s.SQLServer().(*sql.Server). + GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx) + + sqlConn.CheckQueryResults(t, ` + SELECT count(*) + FROM system.statement_statistics + WHERE app_name = 'min_flush_test' + `, [][]string{{"1"}}) + + sqlConn.CheckQueryResults(t, ` + SELECT count(*) + FROM system.transaction_statistics + WHERE app_name = 'min_flush_test' + `, [][]string{{"1"}}) + + // We manually set the time to past the minimum flush interval, now the flush + // should succeed. + fakeTime.setTime(fakeTime.Now().Add(time.Hour)) + + s.SQLServer().(*sql.Server). + GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx) + + sqlConn.CheckQueryResults(t, ` + SELECT count(*) > 1 + FROM system.statement_statistics + WHERE app_name = 'min_flush_test' + `, [][]string{{"true"}}) + +} + +func TestInMemoryStatsDiscard(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + params, _ := tests.CreateTestServerParams() + s, conn, _ := serverutils.StartServer(t, params) + observer := + serverutils.OpenDBConn(t, s.ServingSQLAddr(), "", false /* insecure */, s.Stopper()) + + defer s.Stopper().Stop(context.Background()) + + sqlConn := sqlutils.MakeSQLRunner(conn) + sqlConn.Exec(t, + "SET CLUSTER SETTING sql.stats.flush.minimum_interval = '10m'") + observerConn := sqlutils.MakeSQLRunner(observer) + + t.Run("flush_disabled", func(t *testing.T) { + sqlConn.Exec(t, + "SET CLUSTER SETTING sql.stats.flush.force_cleanup.enabled = true") + sqlConn.Exec(t, + "SET CLUSTER SETTING sql.stats.flush.enabled = false") + + sqlConn.Exec(t, "SET application_name = 'flush_disabled_test'") + sqlConn.Exec(t, "SELECT 1") + + observerConn.CheckQueryResults(t, ` + SELECT count(*) + FROM crdb_internal.statement_statistics + WHERE app_name = 'flush_disabled_test' + `, [][]string{{"1"}}) + + s.SQLServer().(*sql.Server). + GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx) + + observerConn.CheckQueryResults(t, ` + SELECT count(*) + FROM crdb_internal.statement_statistics + WHERE app_name = 'flush_disabled_test' + `, [][]string{{"0"}}) + }) + + t.Run("flush_enabled", func(t *testing.T) { + // Now turn back SQL Stats flush. If the flush is aborted due to violating + // minimum flush interval constraint, we should not be clearing in-memory + // stats. + sqlConn.Exec(t, + "SET CLUSTER SETTING sql.stats.flush.enabled = true") + sqlConn.Exec(t, "SET application_name = 'flush_enabled_test'") + sqlConn.Exec(t, "SELECT 1") + + observerConn.CheckQueryResults(t, ` + SELECT count(*) + FROM crdb_internal.statement_statistics + WHERE app_name = 'flush_enabled_test' + `, [][]string{{"1"}}) + + observerConn.CheckQueryResults(t, ` + SELECT count(*) + FROM crdb_internal.transaction_statistics + WHERE app_name = 'flush_enabled_test' + `, [][]string{{"1"}}) + + // First flush should flush everything into the system tables. + s.SQLServer().(*sql.Server). + GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx) + + observerConn.CheckQueryResults(t, ` + SELECT count(*) + FROM system.statement_statistics + WHERE app_name = 'flush_enabled_test' + `, [][]string{{"1"}}) + + observerConn.CheckQueryResults(t, ` + SELECT count(*) + FROM system.transaction_statistics + WHERE app_name = 'flush_enabled_test' + `, [][]string{{"1"}}) + + sqlConn.Exec(t, "SELECT 1,1") + + // Second flush should be aborted due to violating the minimum flush + // interval requirement. Though the data should still remain in-memory. + s.SQLServer().(*sql.Server). + GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx) + + observerConn.CheckQueryResults(t, ` + SELECT count(*) + FROM system.statement_statistics + WHERE app_name = 'flush_enabled_test' + `, [][]string{{"1"}}) + + observerConn.CheckQueryResults(t, ` + SELECT count(*) + FROM system.transaction_statistics + WHERE app_name = 'flush_enabled_test' + `, [][]string{{"1"}}) + + observerConn.CheckQueryResults(t, ` + SELECT count(*) + FROM crdb_internal.statement_statistics + WHERE app_name = 'flush_enabled_test' + `, [][]string{{"2"}}) + + observerConn.CheckQueryResults(t, ` + SELECT count(*) + FROM crdb_internal.transaction_statistics + WHERE app_name = 'flush_enabled_test' + `, [][]string{{"2"}}) + }) +} + type stubTime struct { syncutil.RWMutex t time.Time diff --git a/pkg/sql/sqlstats/persistedsqlstats/provider.go b/pkg/sql/sqlstats/persistedsqlstats/provider.go index 0ea6b20c8107..bc4ef152a621 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/provider.go +++ b/pkg/sql/sqlstats/persistedsqlstats/provider.go @@ -141,10 +141,7 @@ func (s *PersistedSQLStats) startSQLStatsFlushLoop(ctx context.Context, stopper return } - enabled := SQLStatsFlushEnabled.Get(&s.cfg.Settings.SV) - if enabled { - s.Flush(ctx) - } + s.Flush(ctx) } }) } diff --git a/pkg/sql/sqlstats/persistedsqlstats/testdata/stmt_grouping_in_explicit_txn b/pkg/sql/sqlstats/persistedsqlstats/testdata/stmt_grouping_in_explicit_txn index 834fe91b184c..321e190378e8 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/testdata/stmt_grouping_in_explicit_txn +++ b/pkg/sql/sqlstats/persistedsqlstats/testdata/stmt_grouping_in_explicit_txn @@ -170,6 +170,9 @@ WHERE WHERE jsonb_array_length(metadata -> 'stmtFingerprintIDs') >= 3 ) +ORDER BY + (statistics -> 'statistics' ->> 'cnt')::INT +ASC ---- -df3c70bf7729b433,705fcdf3f12803ec,SELECT _,2 15c74af7a18bd6da,baa4f7bb278a6105,SELECT _, _,1 +df3c70bf7729b433,705fcdf3f12803ec,SELECT _,2