Skip to content

Commit

Permalink
Merge pull request #78967 from cockroachdb/blathers/backport-release-…
Browse files Browse the repository at this point in the history
…22.1-78446

release-22.1: sql: new SQL Stats cluster settings to improve write traffic
  • Loading branch information
Azhng authored Mar 30, 2022
2 parents 34daf80 + 7c56ef0 commit eb7125b
Show file tree
Hide file tree
Showing 5 changed files with 250 additions and 10 deletions.
26 changes: 26 additions & 0 deletions pkg/sql/sqlstats/persistedsqlstats/cluster_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,32 @@ var SQLStatsFlushInterval = settings.RegisterDurationSetting(
settings.NonNegativeDurationWithMaximum(time.Hour*24),
).WithPublic()

// MinimumInterval is the cluster setting that controls the minimum interval
// between each flush operation. If flush operations get triggered faster
// than what is allowed by this setting, (e.g. when too many fingerprints are
// generated in a short span of time, which in turn cause memory pressure), the
// flush operation will be aborted.
var MinimumInterval = settings.RegisterDurationSetting(
settings.TenantWritable,
"sql.stats.flush.minimum_interval",
"the minimum interval that SQL stats can be flushes to disk. If a "+
"flush operation starts within less than the minimum interval, the flush "+
"operation will be aborted",
0,
settings.NonNegativeDuration,
)

// DiscardInMemoryStatsWhenFlushDisabled is the cluster setting that allows the
// older in-memory SQL stats to be discarded when flushing to persisted tables
// is disabled.
var DiscardInMemoryStatsWhenFlushDisabled = settings.RegisterBoolSetting(
settings.TenantWritable,
"sql.stats.flush.force_cleanup.enabled",
"if set, older SQL stats are discarded periodically when flushing to "+
"persisted tables is disabled",
false,
)

// SQLStatsFlushEnabled is the cluster setting that controls if the sqlstats
// subsystem persists the statistics into system table.
var SQLStatsFlushEnabled = settings.RegisterBoolSetting(
Expand Down
41 changes: 36 additions & 5 deletions pkg/sql/sqlstats/persistedsqlstats/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,49 @@ import (
// Flush flushes in-memory sql stats into system table. Any errors encountered
// during the flush will be logged as warning.
func (s *PersistedSQLStats) Flush(ctx context.Context) {
now := s.getTimeNow()

allowDiscardWhenDisabled := DiscardInMemoryStatsWhenFlushDisabled.Get(&s.cfg.Settings.SV)
minimumFlushInterval := MinimumInterval.Get(&s.cfg.Settings.SV)

enabled := SQLStatsFlushEnabled.Get(&s.cfg.Settings.SV)
flushingTooSoon := now.Before(s.lastFlushStarted.Add(minimumFlushInterval))

// Handle wiping in-memory stats here, we only wipe in-memory stats under 2
// circumstances:
// 1. flush is enabled, and we are not early aborting the flush due to flushing
// too frequently.
// 2. flush is disabled, but we allow discard in-memory stats when disabled.
shouldWipeInMemoryStats := enabled && !flushingTooSoon
shouldWipeInMemoryStats = shouldWipeInMemoryStats || (!enabled && allowDiscardWhenDisabled)

if shouldWipeInMemoryStats {
defer func() {
if err := s.SQLStats.Reset(ctx); err != nil {
log.Warningf(ctx, "fail to reset in-memory SQL Stats: %s", err)
}
}()
}

// Handle early abortion of the flush.
if !enabled {
return
}

if flushingTooSoon {
log.Infof(ctx, "flush aborted due to high flush frequency. "+
"The minimum interval between flushes is %s", minimumFlushInterval.String())
return
}

s.lastFlushStarted = now
log.Infof(ctx, "flushing %d stmt/txn fingerprints (%d bytes) after %s",
s.SQLStats.GetTotalFingerprintCount(), s.SQLStats.GetTotalFingerprintBytes(), timeutil.Since(s.lastFlushStarted))

aggregatedTs := s.ComputeAggregatedTs()
s.lastFlushStarted = s.getTimeNow()

s.flushStmtStats(ctx, aggregatedTs)
s.flushTxnStats(ctx, aggregatedTs)

if err := s.SQLStats.Reset(ctx); err != nil {
log.Warningf(ctx, "fail to reset in-memory SQL Stats: %s", err)
}
}

func (s *PersistedSQLStats) flushStmtStats(ctx context.Context, aggregatedTs time.Time) {
Expand Down
183 changes: 183 additions & 0 deletions pkg/sql/sqlstats/persistedsqlstats/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,189 @@ func TestSQLStatsInitialDelay(t *testing.T) {
"expected latest nextFlushAt to be %s, but found %s", maxNextRunAt, initialNextFlushAt)
}

func TestSQLStatsMinimumFlushInterval(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
fakeTime := stubTime{
aggInterval: time.Hour,
}
fakeTime.setTime(timeutil.Now())

params, _ := tests.CreateTestServerParams()
params.Knobs.SQLStatsKnobs = &sqlstats.TestingKnobs{
StubTimeNow: fakeTime.Now,
}
s, conn, _ := serverutils.StartServer(t, params)

defer s.Stopper().Stop(context.Background())

sqlConn := sqlutils.MakeSQLRunner(conn)

sqlConn.Exec(t, "SET CLUSTER SETTING sql.stats.flush.minimum_interval = '10m'")
sqlConn.Exec(t, "SET application_name = 'min_flush_test'")
sqlConn.Exec(t, "SELECT 1")

s.SQLServer().(*sql.Server).
GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx)

sqlConn.CheckQueryResults(t, `
SELECT count(*)
FROM system.statement_statistics
WHERE app_name = 'min_flush_test'
`, [][]string{{"1"}})

sqlConn.CheckQueryResults(t, `
SELECT count(*)
FROM system.transaction_statistics
WHERE app_name = 'min_flush_test'
`, [][]string{{"1"}})

// Since by default, the minimum flush interval is 10 minutes, a subsequent
// flush should be no-op.
s.SQLServer().(*sql.Server).
GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx)

sqlConn.CheckQueryResults(t, `
SELECT count(*)
FROM system.statement_statistics
WHERE app_name = 'min_flush_test'
`, [][]string{{"1"}})

sqlConn.CheckQueryResults(t, `
SELECT count(*)
FROM system.transaction_statistics
WHERE app_name = 'min_flush_test'
`, [][]string{{"1"}})

// We manually set the time to past the minimum flush interval, now the flush
// should succeed.
fakeTime.setTime(fakeTime.Now().Add(time.Hour))

s.SQLServer().(*sql.Server).
GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx)

sqlConn.CheckQueryResults(t, `
SELECT count(*) > 1
FROM system.statement_statistics
WHERE app_name = 'min_flush_test'
`, [][]string{{"true"}})

}

func TestInMemoryStatsDiscard(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
params, _ := tests.CreateTestServerParams()
s, conn, _ := serverutils.StartServer(t, params)
observer :=
serverutils.OpenDBConn(t, s.ServingSQLAddr(), "", false /* insecure */, s.Stopper())

defer s.Stopper().Stop(context.Background())

sqlConn := sqlutils.MakeSQLRunner(conn)
sqlConn.Exec(t,
"SET CLUSTER SETTING sql.stats.flush.minimum_interval = '10m'")
observerConn := sqlutils.MakeSQLRunner(observer)

t.Run("flush_disabled", func(t *testing.T) {
sqlConn.Exec(t,
"SET CLUSTER SETTING sql.stats.flush.force_cleanup.enabled = true")
sqlConn.Exec(t,
"SET CLUSTER SETTING sql.stats.flush.enabled = false")

sqlConn.Exec(t, "SET application_name = 'flush_disabled_test'")
sqlConn.Exec(t, "SELECT 1")

observerConn.CheckQueryResults(t, `
SELECT count(*)
FROM crdb_internal.statement_statistics
WHERE app_name = 'flush_disabled_test'
`, [][]string{{"1"}})

s.SQLServer().(*sql.Server).
GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx)

observerConn.CheckQueryResults(t, `
SELECT count(*)
FROM crdb_internal.statement_statistics
WHERE app_name = 'flush_disabled_test'
`, [][]string{{"0"}})
})

t.Run("flush_enabled", func(t *testing.T) {
// Now turn back SQL Stats flush. If the flush is aborted due to violating
// minimum flush interval constraint, we should not be clearing in-memory
// stats.
sqlConn.Exec(t,
"SET CLUSTER SETTING sql.stats.flush.enabled = true")
sqlConn.Exec(t, "SET application_name = 'flush_enabled_test'")
sqlConn.Exec(t, "SELECT 1")

observerConn.CheckQueryResults(t, `
SELECT count(*)
FROM crdb_internal.statement_statistics
WHERE app_name = 'flush_enabled_test'
`, [][]string{{"1"}})

observerConn.CheckQueryResults(t, `
SELECT count(*)
FROM crdb_internal.transaction_statistics
WHERE app_name = 'flush_enabled_test'
`, [][]string{{"1"}})

// First flush should flush everything into the system tables.
s.SQLServer().(*sql.Server).
GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx)

observerConn.CheckQueryResults(t, `
SELECT count(*)
FROM system.statement_statistics
WHERE app_name = 'flush_enabled_test'
`, [][]string{{"1"}})

observerConn.CheckQueryResults(t, `
SELECT count(*)
FROM system.transaction_statistics
WHERE app_name = 'flush_enabled_test'
`, [][]string{{"1"}})

sqlConn.Exec(t, "SELECT 1,1")

// Second flush should be aborted due to violating the minimum flush
// interval requirement. Though the data should still remain in-memory.
s.SQLServer().(*sql.Server).
GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx)

observerConn.CheckQueryResults(t, `
SELECT count(*)
FROM system.statement_statistics
WHERE app_name = 'flush_enabled_test'
`, [][]string{{"1"}})

observerConn.CheckQueryResults(t, `
SELECT count(*)
FROM system.transaction_statistics
WHERE app_name = 'flush_enabled_test'
`, [][]string{{"1"}})

observerConn.CheckQueryResults(t, `
SELECT count(*)
FROM crdb_internal.statement_statistics
WHERE app_name = 'flush_enabled_test'
`, [][]string{{"2"}})

observerConn.CheckQueryResults(t, `
SELECT count(*)
FROM crdb_internal.transaction_statistics
WHERE app_name = 'flush_enabled_test'
`, [][]string{{"2"}})
})
}

type stubTime struct {
syncutil.RWMutex
t time.Time
Expand Down
5 changes: 1 addition & 4 deletions pkg/sql/sqlstats/persistedsqlstats/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,7 @@ func (s *PersistedSQLStats) startSQLStatsFlushLoop(ctx context.Context, stopper
return
}

enabled := SQLStatsFlushEnabled.Get(&s.cfg.Settings.SV)
if enabled {
s.Flush(ctx)
}
s.Flush(ctx)
}
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ WHERE
WHERE
jsonb_array_length(metadata -> 'stmtFingerprintIDs') >= 3
)
ORDER BY
(statistics -> 'statistics' ->> 'cnt')::INT
ASC
----
df3c70bf7729b433,705fcdf3f12803ec,SELECT _,2
15c74af7a18bd6da,baa4f7bb278a6105,SELECT _, _,1
df3c70bf7729b433,705fcdf3f12803ec,SELECT _,2

0 comments on commit eb7125b

Please sign in to comment.