Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-21.2: sql: new SQL Stats cluster settings to improve write traffic #78966

Merged
merged 1 commit into from
Mar 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions pkg/sql/sqlstats/persistedsqlstats/cluster_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,30 @@ var SQLStatsFlushInterval = settings.RegisterDurationSetting(
settings.NonNegativeDurationWithMaximum(time.Hour*24),
).WithPublic()

// MinimumInterval is the cluster setting that controls the minimum interval
// between each flush operation. If flush operations get triggered faster
// than what is allowed by this setting, (e.g. when too many fingerprints are
// generated in a short span of time, which in turn cause memory pressure), the
// flush operation will be aborted.
var MinimumInterval = settings.RegisterDurationSetting(
"sql.stats.flush.minimum_interval",
"the minimum interval that SQL stats can be flushes to disk. If a "+
"flush operation starts within less than the minimum interval, the flush "+
"operation will be aborted",
0,
settings.NonNegativeDuration,
)

// DiscardInMemoryStatsWhenFlushDisabled is the cluster setting that allows the
// older in-memory SQL stats to be discarded when flushing to persisted tables
// is disabled.
var DiscardInMemoryStatsWhenFlushDisabled = settings.RegisterBoolSetting(
"sql.stats.flush.force_cleanup.enabled",
"if set, older SQL stats are discarded periodically when flushing to "+
"persisted tables is disabled",
false,
)

// SQLStatsFlushEnabled is the cluster setting that controls if the sqlstats
// subsystem persists the statistics into system table.
var SQLStatsFlushEnabled = settings.RegisterBoolSetting(
Expand Down
41 changes: 36 additions & 5 deletions pkg/sql/sqlstats/persistedsqlstats/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,49 @@ import (
// Flush flushes in-memory sql stats into system table. Any errors encountered
// during the flush will be logged as warning.
func (s *PersistedSQLStats) Flush(ctx context.Context) {
now := s.getTimeNow()

allowDiscardWhenDisabled := DiscardInMemoryStatsWhenFlushDisabled.Get(&s.cfg.Settings.SV)
minimumFlushInterval := MinimumInterval.Get(&s.cfg.Settings.SV)

enabled := SQLStatsFlushEnabled.Get(&s.cfg.Settings.SV)
flushingTooSoon := now.Before(s.lastFlushStarted.Add(minimumFlushInterval))

// Handle wiping in-memory stats here, we only wipe in-memory stats under 2
// circumstances:
// 1. flush is enabled, and we are not early aborting the flush due to flushing
// too frequently.
// 2. flush is disabled, but we allow discard in-memory stats when disabled.
shouldWipeInMemoryStats := enabled && !flushingTooSoon
shouldWipeInMemoryStats = shouldWipeInMemoryStats || (!enabled && allowDiscardWhenDisabled)

if shouldWipeInMemoryStats {
defer func() {
if err := s.SQLStats.Reset(ctx); err != nil {
log.Warningf(ctx, "fail to reset in-memory SQL Stats: %s", err)
}
}()
}

// Handle early abortion of the flush.
if !enabled {
return
}

if flushingTooSoon {
log.Infof(ctx, "flush aborted due to high flush frequency. "+
"The minimum interval between flushes is %s", minimumFlushInterval.String())
return
}

s.lastFlushStarted = now
log.Infof(ctx, "flushing %d stmt/txn fingerprints (%d bytes) after %s",
s.SQLStats.GetTotalFingerprintCount(), s.SQLStats.GetTotalFingerprintBytes(), timeutil.Since(s.lastFlushStarted))

aggregatedTs := s.ComputeAggregatedTs()
s.lastFlushStarted = s.getTimeNow()

s.flushStmtStats(ctx, aggregatedTs)
s.flushTxnStats(ctx, aggregatedTs)

if err := s.SQLStats.Reset(ctx); err != nil {
log.Warningf(ctx, "fail to reset in-memory SQL Stats: %s", err)
}
}

func (s *PersistedSQLStats) flushStmtStats(ctx context.Context, aggregatedTs time.Time) {
Expand Down
183 changes: 183 additions & 0 deletions pkg/sql/sqlstats/persistedsqlstats/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,189 @@ func TestSQLStatsInitialDelay(t *testing.T) {
"expected latest nextFlushAt to be %s, but found %s", maxNextRunAt, initialNextFlushAt)
}

func TestSQLStatsMinimumFlushInterval(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
fakeTime := stubTime{
aggInterval: time.Hour,
}
fakeTime.setTime(timeutil.Now())

params, _ := tests.CreateTestServerParams()
params.Knobs.SQLStatsKnobs = &sqlstats.TestingKnobs{
StubTimeNow: fakeTime.Now,
}
s, conn, _ := serverutils.StartServer(t, params)

defer s.Stopper().Stop(context.Background())

sqlConn := sqlutils.MakeSQLRunner(conn)

sqlConn.Exec(t, "SET CLUSTER SETTING sql.stats.flush.minimum_interval = '10m'")
sqlConn.Exec(t, "SET application_name = 'min_flush_test'")
sqlConn.Exec(t, "SELECT 1")

s.SQLServer().(*sql.Server).
GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx)

sqlConn.CheckQueryResults(t, `
SELECT count(*)
FROM system.statement_statistics
WHERE app_name = 'min_flush_test'
`, [][]string{{"1"}})

sqlConn.CheckQueryResults(t, `
SELECT count(*)
FROM system.transaction_statistics
WHERE app_name = 'min_flush_test'
`, [][]string{{"1"}})

// Since by default, the minimum flush interval is 10 minutes, a subsequent
// flush should be no-op.
s.SQLServer().(*sql.Server).
GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx)

sqlConn.CheckQueryResults(t, `
SELECT count(*)
FROM system.statement_statistics
WHERE app_name = 'min_flush_test'
`, [][]string{{"1"}})

sqlConn.CheckQueryResults(t, `
SELECT count(*)
FROM system.transaction_statistics
WHERE app_name = 'min_flush_test'
`, [][]string{{"1"}})

// We manually set the time to past the minimum flush interval, now the flush
// should succeed.
fakeTime.setTime(fakeTime.Now().Add(time.Hour))

s.SQLServer().(*sql.Server).
GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx)

sqlConn.CheckQueryResults(t, `
SELECT count(*) > 1
FROM system.statement_statistics
WHERE app_name = 'min_flush_test'
`, [][]string{{"true"}})

}

func TestInMemoryStatsDiscard(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
params, _ := tests.CreateTestServerParams()
s, conn, _ := serverutils.StartServer(t, params)
observer :=
serverutils.OpenDBConn(t, s.ServingSQLAddr(), "", false /* insecure */, s.Stopper())

defer s.Stopper().Stop(context.Background())

sqlConn := sqlutils.MakeSQLRunner(conn)
sqlConn.Exec(t,
"SET CLUSTER SETTING sql.stats.flush.minimum_interval = '10m'")
observerConn := sqlutils.MakeSQLRunner(observer)

t.Run("flush_disabled", func(t *testing.T) {
sqlConn.Exec(t,
"SET CLUSTER SETTING sql.stats.flush.force_cleanup.enabled = true")
sqlConn.Exec(t,
"SET CLUSTER SETTING sql.stats.flush.enabled = false")

sqlConn.Exec(t, "SET application_name = 'flush_disabled_test'")
sqlConn.Exec(t, "SELECT 1")

observerConn.CheckQueryResults(t, `
SELECT count(*)
FROM crdb_internal.statement_statistics
WHERE app_name = 'flush_disabled_test'
`, [][]string{{"1"}})

s.SQLServer().(*sql.Server).
GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx)

observerConn.CheckQueryResults(t, `
SELECT count(*)
FROM crdb_internal.statement_statistics
WHERE app_name = 'flush_disabled_test'
`, [][]string{{"0"}})
})

t.Run("flush_enabled", func(t *testing.T) {
// Now turn back SQL Stats flush. If the flush is aborted due to violating
// minimum flush interval constraint, we should not be clearing in-memory
// stats.
sqlConn.Exec(t,
"SET CLUSTER SETTING sql.stats.flush.enabled = true")
sqlConn.Exec(t, "SET application_name = 'flush_enabled_test'")
sqlConn.Exec(t, "SELECT 1")

observerConn.CheckQueryResults(t, `
SELECT count(*)
FROM crdb_internal.statement_statistics
WHERE app_name = 'flush_enabled_test'
`, [][]string{{"1"}})

observerConn.CheckQueryResults(t, `
SELECT count(*)
FROM crdb_internal.transaction_statistics
WHERE app_name = 'flush_enabled_test'
`, [][]string{{"1"}})

// First flush should flush everything into the system tables.
s.SQLServer().(*sql.Server).
GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx)

observerConn.CheckQueryResults(t, `
SELECT count(*)
FROM system.statement_statistics
WHERE app_name = 'flush_enabled_test'
`, [][]string{{"1"}})

observerConn.CheckQueryResults(t, `
SELECT count(*)
FROM system.transaction_statistics
WHERE app_name = 'flush_enabled_test'
`, [][]string{{"1"}})

sqlConn.Exec(t, "SELECT 1,1")

// Second flush should be aborted due to violating the minimum flush
// interval requirement. Though the data should still remain in-memory.
s.SQLServer().(*sql.Server).
GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx)

observerConn.CheckQueryResults(t, `
SELECT count(*)
FROM system.statement_statistics
WHERE app_name = 'flush_enabled_test'
`, [][]string{{"1"}})

observerConn.CheckQueryResults(t, `
SELECT count(*)
FROM system.transaction_statistics
WHERE app_name = 'flush_enabled_test'
`, [][]string{{"1"}})

observerConn.CheckQueryResults(t, `
SELECT count(*)
FROM crdb_internal.statement_statistics
WHERE app_name = 'flush_enabled_test'
`, [][]string{{"2"}})

observerConn.CheckQueryResults(t, `
SELECT count(*)
FROM crdb_internal.transaction_statistics
WHERE app_name = 'flush_enabled_test'
`, [][]string{{"2"}})
})
}

type stubTime struct {
syncutil.RWMutex
t time.Time
Expand Down
5 changes: 1 addition & 4 deletions pkg/sql/sqlstats/persistedsqlstats/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,7 @@ func (s *PersistedSQLStats) startSQLStatsFlushLoop(ctx context.Context, stopper
return
}

enabled := SQLStatsFlushEnabled.Get(&s.cfg.Settings.SV)
if enabled {
s.Flush(ctx)
}
s.Flush(ctx)
}
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ WHERE
WHERE
jsonb_array_length(metadata -> 'stmtFingerprintIDs') >= 3
)
ORDER BY
(statistics -> 'statistics' ->> 'cnt')::INT
ASC
----
df3c70bf7729b433,705fcdf3f12803ec,SELECT _,2
15c74af7a18bd6da,baa4f7bb278a6105,SELECT _, _,1
df3c70bf7729b433,705fcdf3f12803ec,SELECT _,2