Skip to content

Commit

Permalink
sql: new SQL Stats cluster settings to improve write traffic
Browse files Browse the repository at this point in the history
Resolves cockroachdb#78339

Previously, SQL Stats flushed to system table as soon as the in-memory
buffer is full. This means  the size of the system tables that back SQL
Stats could grow faster than the cleanup job. Additionally, when the
SQL Stats flush is disabled, the SQL Stats is unable to collect any more
new statement / transaction statistics when the in-memory store is full.

This commit introduces two non-public cluster settings:
* `sql.stats.flush.minimum_interval`: this setting limits minimum interval
  between each flush operation. If a flush operation is triggered sooner
  than what is allowed by the minimum interval, (e.g. when the in-memory
  SQL Stats store is full), the flush operation is aborted.
* `sql.stats.flush.periodically_discard_when_disabled`: which allows the
  in-memory SQL Stats to be cleared at the interval specified by
  `sql.stats.flush.interval`, even if the SQL Stats flush is disabled.

Release note: None
  • Loading branch information
Azhng committed Mar 24, 2022
1 parent c954f7a commit b8d9854
Show file tree
Hide file tree
Showing 6 changed files with 256 additions and 10 deletions.
26 changes: 26 additions & 0 deletions pkg/sql/sqlstats/persistedsqlstats/cluster_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,32 @@ var SQLStatsFlushInterval = settings.RegisterDurationSetting(
settings.NonNegativeDurationWithMaximum(time.Hour*24),
).WithPublic()

// MinimumFlushInterval is the cluster setting that controls the minimum
// between each flush operation. If flush operations get triggered faster
// than what is allowed by this setting, (e.g. when too many fingerprints are
// generated in a short span of time, which in turn cause memory pressure), the
// flush operation will be aborted.
var MinimumFlushInterval = settings.RegisterDurationSetting(
settings.TenantWritable,
"sql.stats.flush.minimum_interval",
"the minimum interval that SQL stats can be flushes to disk. If a "+
"flush operation starts within less than the minimum interval, the flush "+
"operation will be aborted",
time.Minute*10,
settings.NonNegativeDuration,
)

// DiscardInMemoryStatsWhenFlushDisabled is the cluster setting that allows the
// older in-memory SQL stats to be discarded when flushing to persisted tables
// is disabled.
var DiscardInMemoryStatsWhenFlushDisabled = settings.RegisterBoolSetting(
settings.TenantWritable,
"sql.stats.flush.discard_in_memory_stats_when_flush_disabled.enabled",
"if set, older SQL stats are discarded periodically when flushing to "+
"persisted tables is disabled",
false,
)

// SQLStatsFlushEnabled is the cluster setting that controls if the sqlstats
// subsystem persists the statistics into system table.
var SQLStatsFlushEnabled = settings.RegisterBoolSetting(
Expand Down
41 changes: 36 additions & 5 deletions pkg/sql/sqlstats/persistedsqlstats/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,49 @@ import (
// Flush flushes in-memory sql stats into system table. Any errors encountered
// during the flush will be logged as warning.
func (s *PersistedSQLStats) Flush(ctx context.Context) {
now := s.getTimeNow()

allowDiscardWhenDisabled := DiscardInMemoryStatsWhenFlushDisabled.Get(&s.cfg.Settings.SV)
minimumFlushInterval := MinimumFlushInterval.Get(&s.cfg.Settings.SV)

enabled := SQLStatsFlushEnabled.Get(&s.cfg.Settings.SV)
flushingTooSoon := now.Before(s.lastFlushStarted.Add(minimumFlushInterval))

// Handle wiping in-memory stats here, we only wipe in-memory stats under 2
// circumstances:
// 1. flush is enabled, and we are not early aborting the flush due to flushing
// too frequently.
// 2. flush is disabled, but we allow discard in-memory stats when disabled.
shouldWipeInMemoryStats := enabled && !flushingTooSoon
shouldWipeInMemoryStats = shouldWipeInMemoryStats || (!enabled && allowDiscardWhenDisabled)

if shouldWipeInMemoryStats {
defer func() {
if err := s.SQLStats.Reset(ctx); err != nil {
log.Warningf(ctx, "fail to reset in-memory SQL Stats: %s", err)
}
}()
}

// Handle early abortion of the flush.
if !enabled {
return
}

if flushingTooSoon {
log.Infof(ctx, "flush aborted due to high flush frequency. "+
"The minimum interval between flushes is %s", minimumFlushInterval.String())
return
}

s.lastFlushStarted = now
log.Infof(ctx, "flushing %d stmt/txn fingerprints (%d bytes) after %s",
s.SQLStats.GetTotalFingerprintCount(), s.SQLStats.GetTotalFingerprintBytes(), timeutil.Since(s.lastFlushStarted))

aggregatedTs := s.ComputeAggregatedTs()
s.lastFlushStarted = s.getTimeNow()

s.flushStmtStats(ctx, aggregatedTs)
s.flushTxnStats(ctx, aggregatedTs)

if err := s.SQLStats.Reset(ctx); err != nil {
log.Warningf(ctx, "fail to reset in-memory SQL Stats: %s", err)
}
}

func (s *PersistedSQLStats) flushStmtStats(ctx context.Context, aggregatedTs time.Time) {
Expand Down
181 changes: 181 additions & 0 deletions pkg/sql/sqlstats/persistedsqlstats/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func TestSQLStatsFlush(t *testing.T) {
pgSecondSQLConn, err := gosql.Open("postgres", secondPgURL.String())
require.NoError(t, err)
secondSQLConn := sqlutils.MakeSQLRunner(pgSecondSQLConn)
secondSQLConn.Exec(t, "SET CLUSTER SETTING sql.stats.flush.minimum_interval = '0s'")

firstServerSQLStats := firstServer.SQLServer().(*sql.Server).GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats)
secondServerSQLStats := secondServer.SQLServer().(*sql.Server).GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats)
Expand Down Expand Up @@ -243,6 +244,186 @@ func TestSQLStatsInitialDelay(t *testing.T) {
"expected latest nextFlushAt to be %s, but found %s", maxNextRunAt, initialNextFlushAt)
}

func TestSQLStatsMinimumFlushInterval(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
fakeTime := stubTime{
aggInterval: time.Hour,
}
fakeTime.setTime(timeutil.Now())

params, _ := tests.CreateTestServerParams()
params.Knobs.SQLStatsKnobs = &sqlstats.TestingKnobs{
StubTimeNow: fakeTime.Now,
}
s, conn, _ := serverutils.StartServer(t, params)

defer s.Stopper().Stop(context.Background())

sqlConn := sqlutils.MakeSQLRunner(conn)

sqlConn.Exec(t, "SET application_name = 'min_flush_test'")
sqlConn.Exec(t, "SELECT 1")

s.SQLServer().(*sql.Server).
GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx)

sqlConn.CheckQueryResults(t, `
SELECT count(*)
FROM system.statement_statistics
WHERE app_name = 'min_flush_test'
`, [][]string{{"1"}})

sqlConn.CheckQueryResults(t, `
SELECT count(*)
FROM system.transaction_statistics
WHERE app_name = 'min_flush_test'
`, [][]string{{"1"}})

// Since by default, the minimum flush interval is 10 minutes, a subsequent
// flush should be no-op.
s.SQLServer().(*sql.Server).
GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx)

sqlConn.CheckQueryResults(t, `
SELECT count(*)
FROM system.statement_statistics
WHERE app_name = 'min_flush_test'
`, [][]string{{"1"}})

sqlConn.CheckQueryResults(t, `
SELECT count(*)
FROM system.transaction_statistics
WHERE app_name = 'min_flush_test'
`, [][]string{{"1"}})

// We manually set the time to past the minimum flush interval, now the flush
// should succeed.
fakeTime.setTime(fakeTime.Now().Add(time.Hour))

s.SQLServer().(*sql.Server).
GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx)

sqlConn.CheckQueryResults(t, `
SELECT count(*) > 1
FROM system.statement_statistics
WHERE app_name = 'min_flush_test'
`, [][]string{{"true"}})

}

func TestInMemoryStatsDiscard(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
params, _ := tests.CreateTestServerParams()
s, conn, _ := serverutils.StartServer(t, params)
observer :=
serverutils.OpenDBConn(t, s.ServingSQLAddr(), "", false /* insecure */, s.Stopper())

defer s.Stopper().Stop(context.Background())

sqlConn := sqlutils.MakeSQLRunner(conn)
observerConn := sqlutils.MakeSQLRunner(observer)

t.Run("flush_disabled", func(t *testing.T) {
sqlConn.Exec(t,
"SET CLUSTER SETTING sql.stats.flush.discard_in_memory_stats_when_flush_disabled.enabled = true")
sqlConn.Exec(t,
"SET CLUSTER SETTING sql.stats.flush.enabled = false")

sqlConn.Exec(t, "SET application_name = 'flush_disabled_test'")
sqlConn.Exec(t, "SELECT 1")

observerConn.CheckQueryResults(t, `
SELECT count(*)
FROM crdb_internal.statement_statistics
WHERE app_name = 'flush_disabled_test'
`, [][]string{{"1"}})

s.SQLServer().(*sql.Server).
GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx)

observerConn.CheckQueryResults(t, `
SELECT count(*)
FROM crdb_internal.statement_statistics
WHERE app_name = 'flush_disabled_test'
`, [][]string{{"0"}})
})

t.Run("flush_enabled", func(t *testing.T) {
// Now turn back SQL Stats flush. If the flush is aborted due to violating
// minimum flush interval constraint, we should not be clearing in-memory
// stats.
sqlConn.Exec(t,
"SET CLUSTER SETTING sql.stats.flush.enabled = true")
sqlConn.Exec(t, "SET application_name = 'flush_enabled_test'")
sqlConn.Exec(t, "SELECT 1")

observerConn.CheckQueryResults(t, `
SELECT count(*)
FROM crdb_internal.statement_statistics
WHERE app_name = 'flush_enabled_test'
`, [][]string{{"1"}})

observerConn.CheckQueryResults(t, `
SELECT count(*)
FROM crdb_internal.transaction_statistics
WHERE app_name = 'flush_enabled_test'
`, [][]string{{"1"}})

// First flush should flush everything into the system tables.
s.SQLServer().(*sql.Server).
GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx)

observerConn.CheckQueryResults(t, `
SELECT count(*)
FROM system.statement_statistics
WHERE app_name = 'flush_enabled_test'
`, [][]string{{"1"}})

observerConn.CheckQueryResults(t, `
SELECT count(*)
FROM system.transaction_statistics
WHERE app_name = 'flush_enabled_test'
`, [][]string{{"1"}})

sqlConn.Exec(t, "SELECT 1,1")

// Second flush should be aborted due to violating the minimum flush
// interval requirement. Though the data should still remain in-memory.
s.SQLServer().(*sql.Server).
GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx)

observerConn.CheckQueryResults(t, `
SELECT count(*)
FROM system.statement_statistics
WHERE app_name = 'flush_enabled_test'
`, [][]string{{"1"}})

observerConn.CheckQueryResults(t, `
SELECT count(*)
FROM system.transaction_statistics
WHERE app_name = 'flush_enabled_test'
`, [][]string{{"1"}})

observerConn.CheckQueryResults(t, `
SELECT count(*)
FROM crdb_internal.statement_statistics
WHERE app_name = 'flush_enabled_test'
`, [][]string{{"2"}})

observerConn.CheckQueryResults(t, `
SELECT count(*)
FROM crdb_internal.transaction_statistics
WHERE app_name = 'flush_enabled_test'
`, [][]string{{"2"}})
})
}

type stubTime struct {
syncutil.RWMutex
t time.Time
Expand Down
5 changes: 1 addition & 4 deletions pkg/sql/sqlstats/persistedsqlstats/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,7 @@ func (s *PersistedSQLStats) startSQLStatsFlushLoop(ctx context.Context, stopper
return
}

enabled := SQLStatsFlushEnabled.Get(&s.cfg.Settings.SV)
if enabled {
s.Flush(ctx)
}
s.Flush(ctx)
}
})
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
exec-sql
SET CLUSTER SETTING sql.stats.flush.minimum_interval = '0s'
----

exec-sql
SET application_name = 'consistent-test'
----
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
exec-sql
SET CLUSTER SETTING sql.stats.flush.minimum_interval = '0s'
----

exec-sql
SET application_name = 'test'
----
Expand Down Expand Up @@ -170,6 +174,9 @@ WHERE
WHERE
jsonb_array_length(metadata -> 'stmtFingerprintIDs') >= 3
)
ORDER BY
(statistics -> 'statistics' ->> 'cnt')::INT
ASC
----
df3c70bf7729b433,705fcdf3f12803ec,SELECT _,2
15c74af7a18bd6da,baa4f7bb278a6105,SELECT _, _,1
df3c70bf7729b433,705fcdf3f12803ec,SELECT _,2

0 comments on commit b8d9854

Please sign in to comment.