Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: fix bug where SQL stats were not flushed into reportable stats #49392

Merged
merged 1 commit into from
May 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions pkg/server/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@ import (
"context"
"strings"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)

func TestTelemetrySQLStatsIndependence(t *testing.T) {
Expand Down Expand Up @@ -69,6 +75,62 @@ CREATE TABLE t.test (x INT PRIMARY KEY);
}
}

func TestEnsureSQLStatsAreFlushedForTelemetry(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
params, _ := tests.CreateTestServerParams()
params.Settings = cluster.MakeClusterSettings()
// Set the SQL stat refresh rate very low so that SQL stats are continuously
// flushed into the telemetry reporting stats pool.
sql.SQLStatReset.Override(&params.Settings.SV, 10*time.Millisecond)
s, sqlDB, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)

// Run some queries against the database.
if _, err := sqlDB.Exec(`
CREATE DATABASE t;
CREATE TABLE t.test (x INT PRIMARY KEY);
INSERT INTO t.test VALUES (1);
INSERT INTO t.test VALUES (2);
`); err != nil {
t.Fatal(err)
}

statusServer := s.(*TestServer).status
sqlServer := s.(*TestServer).Server.sqlServer.pgServer.SQLServer
testutils.SucceedsSoon(t, func() error {
// Get the diagnostic info.
res, err := statusServer.Diagnostics(ctx, &serverpb.DiagnosticsRequest{NodeId: "local"})
if err != nil {
t.Fatal(err)
}

found := false
for _, stat := range res.SqlStats {
// These stats are scrubbed, so look for our scrubbed statement.
if strings.HasPrefix(stat.Key.Query, "INSERT INTO _ VALUES (_)") {
found = true
}
}

if !found {
return errors.New("expected to find query stats, but didn't")
}

// We should also not find the stat in the SQL stats pool, since the SQL
// stats are getting flushed.
stats := sqlServer.GetScrubbedStmtStats()
for _, stat := range stats {
// These stats are scrubbed, so look for our scrubbed statement.
if strings.HasPrefix(stat.Key.Query, "INSERT INTO _ VALUES (_)") {
t.Error("expected to not find stat, but did")
}
}
return nil
})
}

func TestSQLStatCollection(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
Expand Down
47 changes: 29 additions & 18 deletions pkg/sql/app_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,35 +328,31 @@ func (s *sqlStats) getStatsForApplication(appName string) *appStats {
return a
}

func (s *sqlStats) Add(other *sqlStats) {
other.Lock()
appStatsCopy := make(map[string]*appStats)
for k, v := range other.apps {
appStatsCopy[k] = v
}
other.Unlock()
for k, v := range appStatsCopy {
stats := s.getStatsForApplication(k)
// Add manages locks for itself, so we don't need to guard it with locks.
stats.Add(v)
}
}

// resetStats clears all the stored per-app and per-statement
// statistics.
func (s *sqlStats) resetStats(ctx context.Context) {
// resetAndMaybeDumpStats clears all the stored per-app and per-statement
// statistics. If target s not nil, then the stats in s will be flushed
// into target.
func (s *sqlStats) resetAndMaybeDumpStats(ctx context.Context, target *sqlStats) {
// Note: we do not clear the entire s.apps map here. We would need
// to do so to prevent problems with a runaway client running `SET
// APPLICATION_NAME=...` with a different name every time. However,
// any ongoing open client session at the time of the reset has
// cached a pointer to its appStats struct and would thus continue
// to report its stats in an object now invisible to the other tools
// to report its stats in an object now invisible to the target tools
// (virtual table, marshaling, etc.). It's a judgement call, but
// for now we prefer to see more data and thus not clear the map, at
// the risk of seeing the map grow unboundedly with the number of
// different application_names seen so far.

// appStatsCopy will hold a snapshot of the stats being cleared
// to dump into target.
var appStatsCopy map[string]*appStats

s.Lock()

if target != nil {
appStatsCopy = make(map[string]*appStats, len(s.apps))
}

// Clear the per-apps maps manually,
// because any SQL session currently open has cached the
// pointer to its appStats object and will continue to
Expand All @@ -373,13 +369,28 @@ func (s *sqlStats) resetStats(ctx context.Context) {
dumpStmtStats(ctx, appName, a.stmts)
}

// Only save a copy of a if we need to dump a copy of the stats.
if target != nil {
aCopy := &appStats{st: a.st, stmts: a.stmts}
appStatsCopy[appName] = aCopy
}

// Clear the map, to release the memory; make the new map somewhat already
// large for the likely future workload.
a.stmts = make(map[stmtKey]*stmtStats, len(a.stmts)/2)
a.Unlock()
}
s.lastReset = timeutil.Now()
s.Unlock()

// Dump the copied stats into target.
if target != nil {
for k, v := range appStatsCopy {
stats := target.getStatsForApplication(k)
// Add manages locks for itself, so we don't need to guard it with locks.
stats.Add(v)
}
}
}

func (s *sqlStats) getLastReset() time.Time {
Expand Down
39 changes: 26 additions & 13 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,11 @@ type Server struct {
cfg *ExecutorConfig

// sqlStats tracks per-application statistics for all applications on each
// node.
sqlStats sqlStats
// node. Newly collected statistics flow into sqlStats.
sqlStats sqlStats
// reportedStats is a pool of stats that is held for reporting, and is
// cleared on a lower interval than sqlStats. Stats from sqlStats flow
// into reported stats when sqlStats is cleared.
reportedStats sqlStats

reCache *tree.RegexpCache
Expand Down Expand Up @@ -337,22 +340,21 @@ func (s *Server) Start(ctx context.Context, stopper *stop.Stopper) {
// continually allocating space for the SQL stats. Additionally, spawn
// a loop to clear the reported stats at the same large interval just
// in case the telemetry worker fails.
s.PeriodicallyClearSQLStats(ctx, stopper, maxSQLStatReset, &s.sqlStats)
s.PeriodicallyClearSQLStats(ctx, stopper, maxSQLStatReset, &s.reportedStats)
s.PeriodicallyClearSQLStats(ctx, stopper, MaxSQLStatReset, &s.sqlStats, s.ResetSQLStats)
s.PeriodicallyClearSQLStats(ctx, stopper, MaxSQLStatReset, &s.reportedStats, s.ResetReportedStats)
// Start a second loop to clear SQL stats at the requested interval.
s.PeriodicallyClearSQLStats(ctx, stopper, sqlStatReset, &s.sqlStats)
s.PeriodicallyClearSQLStats(ctx, stopper, SQLStatReset, &s.sqlStats, s.ResetSQLStats)
}

// ResetSQLStats resets the executor's collected sql statistics.
func (s *Server) ResetSQLStats(ctx context.Context) {
// Dump the SQL stats into the reported stats before clearing the SQL stats.
s.reportedStats.Add(&s.sqlStats)
s.sqlStats.resetStats(ctx)
s.sqlStats.resetAndMaybeDumpStats(ctx, &s.reportedStats)
}

// ResetReportedStats resets the executor's collected reported stats.
func (s *Server) ResetReportedStats(ctx context.Context) {
s.reportedStats.resetStats(ctx)
s.reportedStats.resetAndMaybeDumpStats(ctx, nil /* target */)
}

// GetScrubbedStmtStats returns the statement statistics by app, with the
Expand Down Expand Up @@ -701,15 +703,19 @@ func (s *Server) newConnExecutorWithTxn(
return ex
}

var sqlStatReset = settings.RegisterPublicNonNegativeDurationSettingWithMaximum(
// SQLStatReset is the cluster setting that controls at what interval SQL
// statement statistics should be reset.
var SQLStatReset = settings.RegisterPublicNonNegativeDurationSettingWithMaximum(
"diagnostics.sql_stat_reset.interval",
"interval controlling how often SQL statement statistics should "+
"be reset (should be less than diagnostics.forced_sql_stat_reset.interval). It has a max value of 24H.",
time.Hour,
time.Hour*24,
)

var maxSQLStatReset = settings.RegisterPublicNonNegativeDurationSettingWithMaximum(
// MaxSQLStatReset is the cluster setting that controls at what interval SQL
// statement statistics must be flushed within.
var MaxSQLStatReset = settings.RegisterPublicNonNegativeDurationSettingWithMaximum(
"diagnostics.forced_sql_stat_reset.interval",
"interval after which SQL statement statistics are refreshed even "+
"if not collected (should be more than diagnostics.sql_stat_reset.interval). It has a max value of 24H.",
Expand All @@ -718,9 +724,16 @@ var maxSQLStatReset = settings.RegisterPublicNonNegativeDurationSettingWithMaxim
)

// PeriodicallyClearSQLStats spawns a loop to reset stats based on the setting
// of a given duration settings variable.
// of a given duration settings variable. We take in a function to actually do
// the resetting, as some stats have extra work that needs to be performed
// during the reset. For example, the SQL stats need to dump into the parent
// stats before clearing data fully.
func (s *Server) PeriodicallyClearSQLStats(
ctx context.Context, stopper *stop.Stopper, setting *settings.DurationSetting, stats *sqlStats,
ctx context.Context,
stopper *stop.Stopper,
setting *settings.DurationSetting,
stats *sqlStats,
reset func(ctx context.Context),
) {
stopper.RunWorker(ctx, func(ctx context.Context) {
var timer timeutil.Timer
Expand All @@ -732,7 +745,7 @@ func (s *Server) PeriodicallyClearSQLStats(
next := last.Add(setting.Get(&s.cfg.Settings.SV))
wait := next.Sub(timeutil.Now())
if wait < 0 {
stats.resetStats(ctx)
reset(ctx)
} else {
timer.Reset(wait)
select {
Expand Down