Skip to content

Commit

Permalink
server: fix bug where SQL stats were not flushed into reportable stats
Browse files Browse the repository at this point in the history
Touches #49388.

This was caused by the background loops use an incorrect setup to reset
collected stats.

Also, fix a bug where statements issued at the same time that the SQL
stats are flushed would not be reported.

Release note: None
  • Loading branch information
rohany committed Jul 29, 2020
1 parent 97b6fc0 commit d1aea9a
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 31 deletions.
61 changes: 61 additions & 0 deletions pkg/server/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)

func TestTelemetrySQLStatsIndependence(t *testing.T) {
Expand Down Expand Up @@ -71,6 +76,62 @@ CREATE TABLE t.test (x INT PRIMARY KEY);
}
}

func TestEnsureSQLStatsAreFlushedForTelemetry(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
params, _ := tests.CreateTestServerParams()
params.Settings = cluster.MakeClusterSettings()
// Set the SQL stat refresh rate very low so that SQL stats are continuously
// flushed into the telemetry reporting stats pool.
sql.SQLStatReset.Override(&params.Settings.SV, 10*time.Millisecond)
s, sqlDB, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)

// Run some queries against the database.
if _, err := sqlDB.Exec(`
CREATE DATABASE t;
CREATE TABLE t.test (x INT PRIMARY KEY);
INSERT INTO t.test VALUES (1);
INSERT INTO t.test VALUES (2);
`); err != nil {
t.Fatal(err)
}

statusServer := s.(*TestServer).status
sqlServer := s.(*TestServer).Server.pgServer.SQLServer
testutils.SucceedsSoon(t, func() error {
// Get the diagnostic info.
res, err := statusServer.Diagnostics(ctx, &serverpb.DiagnosticsRequest{NodeId: "local"})
if err != nil {
t.Fatal(err)
}

found := false
for _, stat := range res.SqlStats {
// These stats are scrubbed, so look for our scrubbed statement.
if strings.HasPrefix(stat.Key.Query, "INSERT INTO _ VALUES (_)") {
found = true
}
}

if !found {
return errors.New("expected to find query stats, but didn't")
}

// We should also not find the stat in the SQL stats pool, since the SQL
// stats are getting flushed.
stats := sqlServer.GetScrubbedStmtStats()
for _, stat := range stats {
// These stats are scrubbed, so look for our scrubbed statement.
if strings.HasPrefix(stat.Key.Query, "INSERT INTO _ VALUES (_)") {
t.Error("expected to not find stat, but did")
}
}
return nil
})
}

func TestSQLStatCollection(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
Expand Down
47 changes: 29 additions & 18 deletions pkg/sql/app_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,35 +328,31 @@ func (s *sqlStats) getStatsForApplication(appName string) *appStats {
return a
}

func (s *sqlStats) Add(other *sqlStats) {
other.Lock()
appStatsCopy := make(map[string]*appStats)
for k, v := range other.apps {
appStatsCopy[k] = v
}
other.Unlock()
for k, v := range appStatsCopy {
stats := s.getStatsForApplication(k)
// Add manages locks for itself, so we don't need to guard it with locks.
stats.Add(v)
}
}

// resetStats clears all the stored per-app and per-statement
// statistics.
func (s *sqlStats) resetStats(ctx context.Context) {
// resetAndMaybeDumpStats clears all the stored per-app and per-statement
// statistics. If target s not nil, then the stats in s will be flushed
// into target.
func (s *sqlStats) resetAndMaybeDumpStats(ctx context.Context, target *sqlStats) {
// Note: we do not clear the entire s.apps map here. We would need
// to do so to prevent problems with a runaway client running `SET
// APPLICATION_NAME=...` with a different name every time. However,
// any ongoing open client session at the time of the reset has
// cached a pointer to its appStats struct and would thus continue
// to report its stats in an object now invisible to the other tools
// to report its stats in an object now invisible to the target tools
// (virtual table, marshaling, etc.). It's a judgement call, but
// for now we prefer to see more data and thus not clear the map, at
// the risk of seeing the map grow unboundedly with the number of
// different application_names seen so far.

// appStatsCopy will hold a snapshot of the stats being cleared
// to dump into target.
var appStatsCopy map[string]*appStats

s.Lock()

if target != nil {
appStatsCopy = make(map[string]*appStats, len(s.apps))
}

// Clear the per-apps maps manually,
// because any SQL session currently open has cached the
// pointer to its appStats object and will continue to
Expand All @@ -373,13 +369,28 @@ func (s *sqlStats) resetStats(ctx context.Context) {
dumpStmtStats(ctx, appName, a.stmts)
}

// Only save a copy of a if we need to dump a copy of the stats.
if target != nil {
aCopy := &appStats{st: a.st, stmts: a.stmts}
appStatsCopy[appName] = aCopy
}

// Clear the map, to release the memory; make the new map somewhat already
// large for the likely future workload.
a.stmts = make(map[stmtKey]*stmtStats, len(a.stmts)/2)
a.Unlock()
}
s.lastReset = timeutil.Now()
s.Unlock()

// Dump the copied stats into target.
if target != nil {
for k, v := range appStatsCopy {
stats := target.getStatsForApplication(k)
// Add manages locks for itself, so we don't need to guard it with locks.
stats.Add(v)
}
}
}

func (s *sqlStats) getLastReset() time.Time {
Expand Down
39 changes: 26 additions & 13 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,11 @@ type Server struct {
cfg *ExecutorConfig

// sqlStats tracks per-application statistics for all applications on each
// node.
sqlStats sqlStats
// node. Newly collected statistics flow into sqlStats.
sqlStats sqlStats
// reportedStats is a pool of stats that is held for reporting, and is
// cleared on a lower interval than sqlStats. Stats from sqlStats flow
// into reported stats when sqlStats is cleared.
reportedStats sqlStats

reCache *tree.RegexpCache
Expand Down Expand Up @@ -336,22 +339,21 @@ func (s *Server) Start(ctx context.Context, stopper *stop.Stopper) {
// continually allocating space for the SQL stats. Additionally, spawn
// a loop to clear the reported stats at the same large interval just
// in case the telemetry worker fails.
s.PeriodicallyClearSQLStats(ctx, stopper, maxSQLStatReset, &s.sqlStats)
s.PeriodicallyClearSQLStats(ctx, stopper, maxSQLStatReset, &s.reportedStats)
s.PeriodicallyClearSQLStats(ctx, stopper, MaxSQLStatReset, &s.sqlStats, s.ResetSQLStats)
s.PeriodicallyClearSQLStats(ctx, stopper, MaxSQLStatReset, &s.reportedStats, s.ResetReportedStats)
// Start a second loop to clear SQL stats at the requested interval.
s.PeriodicallyClearSQLStats(ctx, stopper, sqlStatReset, &s.sqlStats)
s.PeriodicallyClearSQLStats(ctx, stopper, SQLStatReset, &s.sqlStats, s.ResetSQLStats)
}

// ResetSQLStats resets the executor's collected sql statistics.
func (s *Server) ResetSQLStats(ctx context.Context) {
// Dump the SQL stats into the reported stats before clearing the SQL stats.
s.reportedStats.Add(&s.sqlStats)
s.sqlStats.resetStats(ctx)
s.sqlStats.resetAndMaybeDumpStats(ctx, &s.reportedStats)
}

// ResetReportedStats resets the executor's collected reported stats.
func (s *Server) ResetReportedStats(ctx context.Context) {
s.reportedStats.resetStats(ctx)
s.reportedStats.resetAndMaybeDumpStats(ctx, nil /* target */)
}

// GetScrubbedStmtStats returns the statement statistics by app, with the
Expand Down Expand Up @@ -727,15 +729,19 @@ func (s *Server) newConnExecutorWithTxn(
return ex, nil
}

var sqlStatReset = settings.RegisterPublicNonNegativeDurationSettingWithMaximum(
// SQLStatReset is the cluster setting that controls at what interval SQL
// statement statistics should be reset.
var SQLStatReset = settings.RegisterPublicNonNegativeDurationSettingWithMaximum(
"diagnostics.sql_stat_reset.interval",
"interval controlling how often SQL statement statistics should "+
"be reset (should be less than diagnostics.forced_sql_stat_reset.interval). It has a max value of 24H.",
time.Hour,
time.Hour*24,
)

var maxSQLStatReset = settings.RegisterPublicNonNegativeDurationSettingWithMaximum(
// MaxSQLStatReset is the cluster setting that controls at what interval SQL
// statement statistics must be flushed within.
var MaxSQLStatReset = settings.RegisterPublicNonNegativeDurationSettingWithMaximum(
"diagnostics.forced_sql_stat_reset.interval",
"interval after which SQL statement statistics are refreshed even "+
"if not collected (should be more than diagnostics.sql_stat_reset.interval). It has a max value of 24H.",
Expand All @@ -744,9 +750,16 @@ var maxSQLStatReset = settings.RegisterPublicNonNegativeDurationSettingWithMaxim
)

// PeriodicallyClearSQLStats spawns a loop to reset stats based on the setting
// of a given duration settings variable.
// of a given duration settings variable. We take in a function to actually do
// the resetting, as some stats have extra work that needs to be performed
// during the reset. For example, the SQL stats need to dump into the parent
// stats before clearing data fully.
func (s *Server) PeriodicallyClearSQLStats(
ctx context.Context, stopper *stop.Stopper, setting *settings.DurationSetting, stats *sqlStats,
ctx context.Context,
stopper *stop.Stopper,
setting *settings.DurationSetting,
stats *sqlStats,
reset func(ctx context.Context),
) {
stopper.RunWorker(ctx, func(ctx context.Context) {
var timer timeutil.Timer
Expand All @@ -758,7 +771,7 @@ func (s *Server) PeriodicallyClearSQLStats(
next := last.Add(setting.Get(&s.cfg.Settings.SV))
wait := next.Sub(timeutil.Now())
if wait < 0 {
stats.resetStats(ctx)
reset(ctx)
} else {
timer.Reset(wait)
select {
Expand Down

0 comments on commit d1aea9a

Please sign in to comment.