diff --git a/pkg/server/stats_test.go b/pkg/server/stats_test.go index 9b411419127d..ebe1d75949cf 100644 --- a/pkg/server/stats_test.go +++ b/pkg/server/stats_test.go @@ -14,12 +14,18 @@ import ( "context" "strings" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/tests" + "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" ) func TestTelemetrySQLStatsIndependence(t *testing.T) { @@ -69,6 +75,50 @@ CREATE TABLE t.test (x INT PRIMARY KEY); } } +func TestEnsureSQLStatsAreFlushedForTelemetry(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + params, _ := tests.CreateTestServerParams() + params.Settings = cluster.MakeClusterSettings() + // Set the SQL stat refresh rate very low so that SQL stats are continuously + // flushed into the telemetry reporting stats pool. + sql.SQLStatReset.Override(¶ms.Settings.SV, 10*time.Millisecond) + s, sqlDB, _ := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) + + // Run some queries against the database. + if _, err := sqlDB.Exec(` +CREATE DATABASE t; +CREATE TABLE t.test (x INT PRIMARY KEY); +INSERT INTO t.test VALUES (1); +INSERT INTO t.test VALUES (2); +`); err != nil { + t.Fatal(err) + } + + statusServer := s.(*TestServer).status + testutils.SucceedsSoon(t, func() error { + // Get the diagnostic info. + res, err := statusServer.Diagnostics(ctx, &serverpb.DiagnosticsRequest{NodeId: "local"}) + if err != nil { + t.Fatal(err) + } + + found := false + for _, stat := range res.SqlStats { + // These stats are scrubbed, so look for our scrubbed statement. + if strings.HasPrefix(stat.Key.Query, "INSERT INTO _ VALUES (_)") { + found = true + } + } + + if !found { + return errors.New("expected to find query stats, but didn't") + } + return nil + }) +} + func TestSQLStatCollection(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() diff --git a/pkg/sql/app_stats.go b/pkg/sql/app_stats.go index eb981e3b3e27..a726cbe2726a 100644 --- a/pkg/sql/app_stats.go +++ b/pkg/sql/app_stats.go @@ -328,35 +328,31 @@ func (s *sqlStats) getStatsForApplication(appName string) *appStats { return a } -func (s *sqlStats) Add(other *sqlStats) { - other.Lock() - appStatsCopy := make(map[string]*appStats) - for k, v := range other.apps { - appStatsCopy[k] = v - } - other.Unlock() - for k, v := range appStatsCopy { - stats := s.getStatsForApplication(k) - // Add manages locks for itself, so we don't need to guard it with locks. - stats.Add(v) - } -} - -// resetStats clears all the stored per-app and per-statement -// statistics. -func (s *sqlStats) resetStats(ctx context.Context) { +// resetAndMaybeDumpStats clears all the stored per-app and per-statement +// statistics. If target s not nil, then the stats in s will be flushed +// into target. +func (s *sqlStats) resetAndMaybeDumpStats(ctx context.Context, target *sqlStats) { // Note: we do not clear the entire s.apps map here. We would need // to do so to prevent problems with a runaway client running `SET // APPLICATION_NAME=...` with a different name every time. However, // any ongoing open client session at the time of the reset has // cached a pointer to its appStats struct and would thus continue - // to report its stats in an object now invisible to the other tools + // to report its stats in an object now invisible to the target tools // (virtual table, marshaling, etc.). It's a judgement call, but // for now we prefer to see more data and thus not clear the map, at // the risk of seeing the map grow unboundedly with the number of // different application_names seen so far. + // appStatsCopy will hold a snapshot of the stats being cleared + // to dump into target. + var appStatsCopy map[string]*appStats + s.Lock() + + if target != nil { + appStatsCopy = make(map[string]*appStats, len(s.apps)) + } + // Clear the per-apps maps manually, // because any SQL session currently open has cached the // pointer to its appStats object and will continue to @@ -373,6 +369,12 @@ func (s *sqlStats) resetStats(ctx context.Context) { dumpStmtStats(ctx, appName, a.stmts) } + // Only save a copy of a if we need to dump a copy of the stats. + if target != nil { + aCopy := &appStats{st: a.st, stmts: a.stmts} + appStatsCopy[appName] = aCopy + } + // Clear the map, to release the memory; make the new map somewhat already // large for the likely future workload. a.stmts = make(map[stmtKey]*stmtStats, len(a.stmts)/2) @@ -380,6 +382,15 @@ func (s *sqlStats) resetStats(ctx context.Context) { } s.lastReset = timeutil.Now() s.Unlock() + + // Dump the copied stats into target. + if target != nil { + for k, v := range appStatsCopy { + stats := target.getStatsForApplication(k) + // Add manages locks for itself, so we don't need to guard it with locks. + stats.Add(v) + } + } } func (s *sqlStats) getLastReset() time.Time { diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 820d69a359c9..7572f2248a25 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -236,8 +236,11 @@ type Server struct { cfg *ExecutorConfig // sqlStats tracks per-application statistics for all applications on each - // node. - sqlStats sqlStats + // node. Newly collected statistics flow into sqlStats. + sqlStats sqlStats + // reportedStats is a pool of stats that is held for reporting, and is + // cleared on a lower interval than sqlStats. Stats from sqlStats flow + // into reported stats when sqlStats is cleared. reportedStats sqlStats reCache *tree.RegexpCache @@ -337,22 +340,21 @@ func (s *Server) Start(ctx context.Context, stopper *stop.Stopper) { // continually allocating space for the SQL stats. Additionally, spawn // a loop to clear the reported stats at the same large interval just // in case the telemetry worker fails. - s.PeriodicallyClearSQLStats(ctx, stopper, maxSQLStatReset, &s.sqlStats) - s.PeriodicallyClearSQLStats(ctx, stopper, maxSQLStatReset, &s.reportedStats) + s.PeriodicallyClearSQLStats(ctx, stopper, MaxSQLStatReset, &s.sqlStats, s.ResetSQLStats) + s.PeriodicallyClearSQLStats(ctx, stopper, MaxSQLStatReset, &s.reportedStats, s.ResetReportedStats) // Start a second loop to clear SQL stats at the requested interval. - s.PeriodicallyClearSQLStats(ctx, stopper, sqlStatReset, &s.sqlStats) + s.PeriodicallyClearSQLStats(ctx, stopper, SQLStatReset, &s.sqlStats, s.ResetSQLStats) } // ResetSQLStats resets the executor's collected sql statistics. func (s *Server) ResetSQLStats(ctx context.Context) { // Dump the SQL stats into the reported stats before clearing the SQL stats. - s.reportedStats.Add(&s.sqlStats) - s.sqlStats.resetStats(ctx) + s.sqlStats.resetAndMaybeDumpStats(ctx, &s.reportedStats) } // ResetReportedStats resets the executor's collected reported stats. func (s *Server) ResetReportedStats(ctx context.Context) { - s.reportedStats.resetStats(ctx) + s.reportedStats.resetAndMaybeDumpStats(ctx, nil /* target */) } // GetScrubbedStmtStats returns the statement statistics by app, with the @@ -701,7 +703,9 @@ func (s *Server) newConnExecutorWithTxn( return ex } -var sqlStatReset = settings.RegisterPublicNonNegativeDurationSettingWithMaximum( +// SQLStatReset is the cluster setting that controls at what interval SQL +// statement statistics should be reset. +var SQLStatReset = settings.RegisterPublicNonNegativeDurationSettingWithMaximum( "diagnostics.sql_stat_reset.interval", "interval controlling how often SQL statement statistics should "+ "be reset (should be less than diagnostics.forced_sql_stat_reset.interval). It has a max value of 24H.", @@ -709,7 +713,9 @@ var sqlStatReset = settings.RegisterPublicNonNegativeDurationSettingWithMaximum( time.Hour*24, ) -var maxSQLStatReset = settings.RegisterPublicNonNegativeDurationSettingWithMaximum( +// MaxSQLStatReset is the cluster setting that controls at what interval SQL +// statement statistics must be flushed within. +var MaxSQLStatReset = settings.RegisterPublicNonNegativeDurationSettingWithMaximum( "diagnostics.forced_sql_stat_reset.interval", "interval after which SQL statement statistics are refreshed even "+ "if not collected (should be more than diagnostics.sql_stat_reset.interval). It has a max value of 24H.", @@ -718,9 +724,16 @@ var maxSQLStatReset = settings.RegisterPublicNonNegativeDurationSettingWithMaxim ) // PeriodicallyClearSQLStats spawns a loop to reset stats based on the setting -// of a given duration settings variable. +// of a given duration settings variable. We take in a function to actually do +// the resetting, as some stats have extra work that needs to be performed +// during the reset. For example, the SQL stats need to dump into the parent +// stats before clearing data fully. func (s *Server) PeriodicallyClearSQLStats( - ctx context.Context, stopper *stop.Stopper, setting *settings.DurationSetting, stats *sqlStats, + ctx context.Context, + stopper *stop.Stopper, + setting *settings.DurationSetting, + stats *sqlStats, + reset func(ctx context.Context), ) { stopper.RunWorker(ctx, func(ctx context.Context) { var timer timeutil.Timer @@ -732,7 +745,7 @@ func (s *Server) PeriodicallyClearSQLStats( next := last.Add(setting.Get(&s.cfg.Settings.SV)) wait := next.Sub(timeutil.Now()) if wait < 0 { - stats.resetStats(ctx) + reset(ctx) } else { timer.Reset(wait) select {