From 838350972c1c8f1045836e545916b5873d649593 Mon Sep 17 00:00:00 2001 From: Azhng Date: Thu, 8 Jul 2021 14:10:30 -0400 Subject: [PATCH] sql: implement persistedsqlstats flush logic This commit implements the initial flush logic of the persisted sql stats subsystem. Release note: None --- .../settings/settings-for-tenants.txt | 1 + docs/generated/settings/settings.html | 1 + pkg/BUILD.bazel | 1 + pkg/base/testing_knobs.go | 1 + pkg/server/BUILD.bazel | 1 + pkg/server/server_sql.go | 5 + pkg/sql/BUILD.bazel | 1 + pkg/sql/conn_executor.go | 38 +- pkg/sql/crdb_internal.go | 4 +- pkg/sql/exec_util.go | 20 + pkg/sql/executor_statement_metrics.go | 4 + .../testdata/logic_test/statement_statistics | 6 - .../sqlstats/persistedsqlstats/BUILD.bazel | 59 ++ .../persistedsqlstats/cluster_settings.go | 26 + pkg/sql/sqlstats/persistedsqlstats/flush.go | 528 ++++++++++++++++++ .../sqlstats/persistedsqlstats/flush_test.go | 363 ++++++++++++ .../sqlstats/persistedsqlstats/main_test.go | 29 + .../sqlstats/persistedsqlstats/provider.go | 74 +++ .../sqlstats/persistedsqlstats/test_utils.go | 27 + pkg/sql/sqlstats/sslocal/sql_stats.go | 16 + pkg/sql/sqlstats/sslocal/sslocal_provider.go | 8 +- .../sqlstats/ssmemstorage/ss_mem_storage.go | 8 +- .../sqlstats/ssmemstorage/ss_mem_writer.go | 25 +- pkg/sql/sqlstats/ssprovider.go | 4 +- pkg/ts/catalog/chart_catalog.go | 24 + 25 files changed, 1243 insertions(+), 31 deletions(-) create mode 100644 pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel create mode 100644 pkg/sql/sqlstats/persistedsqlstats/cluster_settings.go create mode 100644 pkg/sql/sqlstats/persistedsqlstats/flush.go create mode 100644 pkg/sql/sqlstats/persistedsqlstats/flush_test.go create mode 100644 pkg/sql/sqlstats/persistedsqlstats/main_test.go create mode 100644 pkg/sql/sqlstats/persistedsqlstats/provider.go create mode 100644 pkg/sql/sqlstats/persistedsqlstats/test_utils.go diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 5292ac482cff..e9ff77d8d8b5 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -129,6 +129,7 @@ sql.spatial.experimental_box2d_comparison_operators.enabled boolean false enable sql.stats.automatic_collection.enabled boolean true automatic statistics collection mode sql.stats.automatic_collection.fraction_stale_rows float 0.2 target fraction of stale rows per table that will trigger a statistics refresh sql.stats.automatic_collection.min_stale_rows integer 500 target minimum number of stale rows per table that will trigger a statistics refresh +sql.stats.flush.interval duration 1h0m0s the interval at which SQL execution statistics are flushed to disk sql.stats.histogram_collection.enabled boolean true histogram collection mode sql.stats.multi_column_collection.enabled boolean true multi-column statistics collection mode sql.stats.post_events.enabled boolean false if set, an event is logged for every CREATE STATISTICS job diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index e8e8c38317bd..ababff6e9b85 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -133,6 +133,7 @@ sql.stats.automatic_collection.enabledbooleantrueautomatic statistics collection mode sql.stats.automatic_collection.fraction_stale_rowsfloat0.2target fraction of stale rows per table that will trigger a statistics refresh sql.stats.automatic_collection.min_stale_rowsinteger500target minimum number of stale rows per table that will trigger a statistics refresh +sql.stats.flush.intervalduration1h0m0sthe interval at which SQL execution statistics are flushed to disk sql.stats.histogram_collection.enabledbooleantruehistogram collection mode sql.stats.multi_column_collection.enabledbooleantruemulti-column statistics collection mode sql.stats.post_events.enabledbooleanfalseif set, an event is logged for every CREATE STATISTICS job diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index d83a6e7ef1b2..85728c03c0b1 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -295,6 +295,7 @@ ALL_TESTS = [ "//pkg/sql/sqlliveness/slinstance:slinstance_test", "//pkg/sql/sqlliveness/slstorage:slstorage_test", "//pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil:sqlstatsutil_test", + "//pkg/sql/sqlstats/persistedsqlstats:persistedsqlstats_test", "//pkg/sql/stats:stats_test", "//pkg/sql/stmtdiagnostics:stmtdiagnostics_test", "//pkg/sql/tests:tests_test", diff --git a/pkg/base/testing_knobs.go b/pkg/base/testing_knobs.go index b903e8f0ebb1..467eb900c11a 100644 --- a/pkg/base/testing_knobs.go +++ b/pkg/base/testing_knobs.go @@ -39,4 +39,5 @@ type TestingKnobs struct { BackupRestore ModuleTestingKnobs MigrationManager ModuleTestingKnobs IndexUsageStatsKnobs ModuleTestingKnobs + SQLStatsKnobs ModuleTestingKnobs } diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index dc18dc949792..a2d1e68fb3b9 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -145,6 +145,7 @@ go_library( "//pkg/sql/sqlinstance/instanceprovider", "//pkg/sql/sqlliveness", "//pkg/sql/sqlliveness/slprovider", + "//pkg/sql/sqlstats/persistedsqlstats", "//pkg/sql/sqlutil", "//pkg/sql/stats", "//pkg/sql/stmtdiagnostics", diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 7a934c0d6dbb..69b8c5f24b4c 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -77,6 +77,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlinstance/instanceprovider" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slprovider" + "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics" @@ -548,6 +549,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { if cfg.TestingKnobs.JobsTestingKnobs != nil { distSQLCfg.TestingKnobs.JobsTestingKnobs = cfg.TestingKnobs.JobsTestingKnobs } + distSQLServer := distsql.NewServer(ctx, distSQLCfg, cfg.flowScheduler) execinfrapb.RegisterDistSQLServer(cfg.grpcServer, distSQLServer) @@ -708,6 +710,9 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { if indexUsageStatsKnobs := cfg.TestingKnobs.IndexUsageStatsKnobs; indexUsageStatsKnobs != nil { execCfg.IndexUsageStatsTestingKnobs = indexUsageStatsKnobs.(*idxusage.TestingKnobs) } + if sqlStatsKnobs := cfg.TestingKnobs.SQLStatsKnobs; sqlStatsKnobs != nil { + execCfg.SQLStatsTestingKnobs = sqlStatsKnobs.(*persistedsqlstats.TestingKnobs) + } statsRefresher := stats.MakeRefresher( cfg.Settings, diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index da7cdb6573f3..16568354a35a 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -354,6 +354,7 @@ go_library( "//pkg/sql/sqlfsm", "//pkg/sql/sqlliveness", "//pkg/sql/sqlstats", + "//pkg/sql/sqlstats/persistedsqlstats", "//pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil", "//pkg/sql/sqlstats/sslocal", "//pkg/sql/sqltelemetry", diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 5c2583767e05..315606fa0d73 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -49,6 +49,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessionphase" "github.com/cockroachdb/cockroach/pkg/sql/sqlerrors" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" + "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/sslocal" "github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics" "github.com/cockroachdb/cockroach/pkg/sql/types" @@ -321,7 +322,7 @@ func NewServer(cfg *ExecutorConfig, pool *mon.BytesMonitor) *Server { nil, /* reportedProvider */ ) reportedSQLStatsController := reportedSQLStats.GetController(cfg.SQLStatusServer) - sqlStats := sslocal.New( + memSQLStats := sslocal.New( cfg.Settings, sqlstats.MaxMemSQLStatsStmtFingerprints, sqlstats.MaxMemSQLStatsTxnFingerprints, @@ -331,14 +332,11 @@ func NewServer(cfg *ExecutorConfig, pool *mon.BytesMonitor) *Server { sqlstats.SQLStatReset, reportedSQLStats, ) - sqlStatsController := sqlStats.GetController(cfg.SQLStatusServer) s := &Server{ cfg: cfg, Metrics: metrics, InternalMetrics: makeMetrics(cfg, true /* internal */), pool: pool, - sqlStats: sqlStats, - sqlStatsController: sqlStatsController, reportedStats: reportedSQLStats, reportedStatsController: reportedSQLStatsController, reCache: tree.NewRegexpCache(512), @@ -349,6 +347,20 @@ func NewServer(cfg *ExecutorConfig, pool *mon.BytesMonitor) *Server { }), } + sqlStatsInternalExecutor := MakeInternalExecutor(context.Background(), s, MemoryMetrics{}, cfg.Settings) + persistedSQLStats := persistedsqlstats.New(&persistedsqlstats.Config{ + Settings: s.cfg.Settings, + InternalExecutor: &sqlStatsInternalExecutor, + KvDB: cfg.DB, + SQLIDContainer: cfg.NodeID, + Knobs: cfg.SQLStatsTestingKnobs, + FlushCounter: metrics.StatsMetrics.SQLStatsFlushStarted, + FailureCounter: metrics.StatsMetrics.SQLStatsFlushFailure, + FlushDuration: metrics.StatsMetrics.SQLStatsFlushDuration, + }, memSQLStats) + + s.sqlStats = persistedSQLStats + s.sqlStatsController = persistedSQLStats.GetController(cfg.SQLStatusServer) return s } @@ -395,7 +407,12 @@ func makeMetrics(cfg *ExecutorConfig, internal bool) Metrics { ), ReportedSQLStatsMemoryCurBytesCount: metric.NewGauge( getMetricMeta(MetaReportedSQLStatsMemCurBytes, internal)), - DiscardedStatsCount: metric.NewCounter(getMetricMeta(MetaDiscardedSQLStats, internal)), + DiscardedStatsCount: metric.NewCounter(getMetricMeta(MetaDiscardedSQLStats, internal)), + SQLStatsFlushStarted: metric.NewCounter(getMetricMeta(MetaSQLStatsFlushStarted, internal)), + SQLStatsFlushFailure: metric.NewCounter(getMetricMeta(MetaSQLStatsFlushFailure, internal)), + SQLStatsFlushDuration: metric.NewLatency( + getMetricMeta(MetaSQLStatsFlushDuration, internal), 6*metricsSampleInterval, + ), }, } } @@ -418,6 +435,11 @@ func (s *Server) GetSQLStatsController() *sslocal.Controller { return s.sqlStatsController } +// GetSQLStatsProvider returns the provider for the sqlstats subsystem. +func (s *Server) GetSQLStatsProvider() sqlstats.Provider { + return s.sqlStats +} + // GetReportedSQLStatsController returns the sqlstats.Controller for the current // sql.Server's reported SQL Stats. func (s *Server) GetReportedSQLStatsController() *sslocal.Controller { @@ -442,7 +464,7 @@ func (s *Server) GetUnscrubbedStmtStats( ctx context.Context, ) ([]roachpb.CollectedStatementStatistics, error) { var stmtStats []roachpb.CollectedStatementStatistics - stmtStatsVisitor := func(stat *roachpb.CollectedStatementStatistics) error { + stmtStatsVisitor := func(_ context.Context, stat *roachpb.CollectedStatementStatistics) error { stmtStats = append(stmtStats, *stat) return nil } @@ -462,7 +484,7 @@ func (s *Server) GetUnscrubbedTxnStats( ctx context.Context, ) ([]roachpb.CollectedTransactionStatistics, error) { var txnStats []roachpb.CollectedTransactionStatistics - txnStatsVisitor := func(_ roachpb.TransactionFingerprintID, stat *roachpb.CollectedTransactionStatistics) error { + txnStatsVisitor := func(_ context.Context, _ roachpb.TransactionFingerprintID, stat *roachpb.CollectedTransactionStatistics) error { txnStats = append(txnStats, *stat) return nil } @@ -490,7 +512,7 @@ func (s *Server) getScrubbedStmtStats( salt := ClusterSecret.Get(&s.cfg.Settings.SV) var scrubbedStats []roachpb.CollectedStatementStatistics - stmtStatsVisitor := func(stat *roachpb.CollectedStatementStatistics) error { + stmtStatsVisitor := func(_ context.Context, stat *roachpb.CollectedStatementStatistics) error { // Scrub the statement itself. scrubbedQueryStr, ok := scrubStmtStatKey(s.cfg.VirtualSchemas, stat.Key.Query) diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index 4884394f0fc7..098551d3912e 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -916,7 +916,7 @@ CREATE TABLE crdb_internal.node_statement_statistics ( nodeID, _ := p.execCfg.NodeID.OptionalNodeID() // zero if not available - statementVisitor := func(stats *roachpb.CollectedStatementStatistics) error { + statementVisitor := func(_ context.Context, stats *roachpb.CollectedStatementStatistics) error { anonymized := tree.DNull anonStr, ok := scrubStmtStatKey(p.getVirtualTabler(), stats.Key.Query) if ok { @@ -1051,7 +1051,7 @@ CREATE TABLE crdb_internal.node_transaction_statistics ( nodeID, _ := p.execCfg.NodeID.OptionalNodeID() // zero if not available - transactionVisitor := func(txnFingerprintID roachpb.TransactionFingerprintID, stats *roachpb.CollectedTransactionStatistics) error { + transactionVisitor := func(_ context.Context, txnFingerprintID roachpb.TransactionFingerprintID, stats *roachpb.CollectedTransactionStatistics) error { stmtFingerprintIDsDatum := tree.NewDArray(types.String) for _, stmtFingerprintID := range stats.StatementFingerprintIDs { if err := stmtFingerprintIDsDatum.Append(tree.NewDString(strconv.FormatUint(uint64(stmtFingerprintID), 10))); err != nil { diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index b37cec975686..c0da555bdedc 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -77,6 +77,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" + "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats" "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics" "github.com/cockroachdb/cockroach/pkg/sql/types" @@ -855,6 +856,24 @@ var ( Measurement: "Discarded SQL Stats", Unit: metric.Unit_COUNT, } + MetaSQLStatsFlushStarted = metric.Metadata{ + Name: "sql.stats.flush.count", + Help: "Number of times SQL Stats are flushed to persistent storage", + Measurement: "SQL Stats Flush", + Unit: metric.Unit_COUNT, + } + MetaSQLStatsFlushFailure = metric.Metadata{ + Name: "sql.stats.flush.error", + Help: "Number of errors encountered when flushing SQL Stats", + Measurement: "SQL Stats Flush", + Unit: metric.Unit_COUNT, + } + MetaSQLStatsFlushDuration = metric.Metadata{ + Name: "sql.stats.flush.duration", + Help: "Time took to in nanoseconds to complete SQL Stats flush", + Measurement: "SQL Stats Flush", + Unit: metric.Unit_NANOSECONDS, + } ) func getMetricMeta(meta metric.Metadata, internal bool) metric.Metadata { @@ -932,6 +951,7 @@ type ExecutorConfig struct { TenantTestingKnobs *TenantTestingKnobs BackupRestoreTestingKnobs *BackupRestoreTestingKnobs IndexUsageStatsTestingKnobs *idxusage.TestingKnobs + SQLStatsTestingKnobs *persistedsqlstats.TestingKnobs // HistogramWindowInterval is (server.Config).HistogramWindowInterval. HistogramWindowInterval time.Duration diff --git a/pkg/sql/executor_statement_metrics.go b/pkg/sql/executor_statement_metrics.go index a07a31511ce6..c14612b38df4 100644 --- a/pkg/sql/executor_statement_metrics.go +++ b/pkg/sql/executor_statement_metrics.go @@ -65,6 +65,10 @@ type StatsMetrics struct { ReportedSQLStatsMemoryCurBytesCount *metric.Gauge DiscardedStatsCount *metric.Counter + + SQLStatsFlushStarted *metric.Counter + SQLStatsFlushFailure *metric.Counter + SQLStatsFlushDuration *metric.Histogram } // StatsMetrics is part of the metric.Struct interface. diff --git a/pkg/sql/logictest/testdata/logic_test/statement_statistics b/pkg/sql/logictest/testdata/logic_test/statement_statistics index 547ae8faeb98..c6ce62af54b1 100644 --- a/pkg/sql/logictest/testdata/logic_test/statement_statistics +++ b/pkg/sql/logictest/testdata/logic_test/statement_statistics @@ -1,11 +1,5 @@ # LogicTest: local !3node-tenant(52763) -# Disable SQL Stats flush to prevents stats from being cleared from the -# in-memory store. - -statement ok -SET CLUSTER SETTING sql.stats.flush.enabled = false; - # Check that node_statement_statistics report per application statement ok diff --git a/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel b/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel new file mode 100644 index 000000000000..c8a28a0a2374 --- /dev/null +++ b/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel @@ -0,0 +1,59 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "persistedsqlstats", + srcs = [ + "cluster_settings.go", + "flush.go", + "provider.go", + "test_utils.go", + ], + importpath = "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats", + visibility = ["//visibility:public"], + deps = [ + "//pkg/base", + "//pkg/kv", + "//pkg/roachpb:with-mocks", + "//pkg/security", + "//pkg/settings", + "//pkg/settings/cluster", + "//pkg/sql/sem/tree", + "//pkg/sql/sessiondata", + "//pkg/sql/sqlstats", + "//pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil", + "//pkg/sql/sqlstats/sslocal", + "//pkg/sql/sqlutil", + "//pkg/util/log", + "//pkg/util/metric", + "//pkg/util/stop", + "//pkg/util/timeutil", + "@com_github_cockroachdb_errors//:errors", + ], +) + +go_test( + name = "persistedsqlstats_test", + srcs = [ + "flush_test.go", + "main_test.go", + ], + deps = [ + ":persistedsqlstats", + "//pkg/base", + "//pkg/roachpb:with-mocks", + "//pkg/security", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/sql", + "//pkg/sql/sqlstats", + "//pkg/testutils/serverutils", + "//pkg/testutils/sqlutils", + "//pkg/testutils/testcluster", + "//pkg/util/leaktest", + "//pkg/util/log", + "//pkg/util/stop", + "//pkg/util/syncutil", + "//pkg/util/timeutil", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/sql/sqlstats/persistedsqlstats/cluster_settings.go b/pkg/sql/sqlstats/persistedsqlstats/cluster_settings.go new file mode 100644 index 000000000000..2e65b24291a8 --- /dev/null +++ b/pkg/sql/sqlstats/persistedsqlstats/cluster_settings.go @@ -0,0 +1,26 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package persistedsqlstats + +import ( + "time" + + "github.com/cockroachdb/cockroach/pkg/settings" +) + +// SQLStatsFlushInterval is the cluster setting that controls how often the SQL +// stats are flushed to system table. +var SQLStatsFlushInterval = settings.RegisterDurationSetting( + "sql.stats.flush.interval", + "the interval at which SQL execution statistics are flushed to disk", + time.Hour, + settings.NonNegativeDurationWithMaximum(time.Hour*24), +).WithPublic() diff --git a/pkg/sql/sqlstats/persistedsqlstats/flush.go b/pkg/sql/sqlstats/persistedsqlstats/flush.go new file mode 100644 index 000000000000..7d599b2404ef --- /dev/null +++ b/pkg/sql/sqlstats/persistedsqlstats/flush.go @@ -0,0 +1,528 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package persistedsqlstats + +import ( + "context" + "time" + + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" + "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" +) + +// Flush flushes in-memory sql stats into system table. Any errors encountered +// during the flush will be logged as warning. +func (s *PersistedSQLStats) Flush(ctx context.Context) { + log.Infof(ctx, "flushing %d stmt/txn fingerprints (%d bytes) after %s", + s.SQLStats.GetTotalFingerprintCount(), s.SQLStats.GetTotalFingerprintBytes(), timeutil.Since(s.lastFlushStarted)) + + // The flush routine directly logs errors if they are encountered. Therefore, + // no error is returned here. + _ = s.IterateStatementStats(ctx, &sqlstats.IteratorOptions{}, func(ctx context.Context, statistics *roachpb.CollectedStatementStatistics) error { + s.doFlush(ctx, func() error { + return s.doFlushSingleStmtStats(ctx, statistics) + }, "failed to flush statement statistics" /* errMsg */) + + return nil + }) + _ = s.IterateTransactionStats(ctx, &sqlstats.IteratorOptions{}, func(ctx context.Context, key roachpb.TransactionFingerprintID, statistics *roachpb.CollectedTransactionStatistics) error { + s.doFlush(ctx, func() error { + return s.doFlushSingleTxnStats(ctx, key, statistics) + }, "failed to flush transaction statistics" /* errMsg */) + + return nil + }) + + if err := s.SQLStats.Reset(ctx); err != nil { + log.Warningf(ctx, "fail to reset in-memory SQL Stats: %s", err) + } + + s.lastFlushStarted = s.getTimeNow() +} + +func (s *PersistedSQLStats) doFlush(ctx context.Context, workFn func() error, errMsg string) { + var err error + flushBegin := s.getTimeNow() + + defer func() { + if err != nil { + s.cfg.FailureCounter.Inc(1) + log.Warningf(ctx, "%s: %s", errMsg, err) + } + flushDuration := s.getTimeNow().Sub(flushBegin) + s.cfg.FlushDuration.RecordValue(flushDuration.Nanoseconds()) + s.cfg.FlushCounter.Inc(1) + }() + + err = workFn() +} + +func (s *PersistedSQLStats) doFlushSingleTxnStats( + ctx context.Context, + key roachpb.TransactionFingerprintID, + stats *roachpb.CollectedTransactionStatistics, +) error { + return s.cfg.KvDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + // Explicitly copy the stats variable so the txn closure is retryable. + scopedStats := *stats + + aggregatedTs := s.computeAggregatedTs() + serializedFingerprintID := sqlstatsutil.EncodeUint64ToBytes(uint64(key)) + + insertFn := func(ctx context.Context, txn *kv.Txn) (alreadyExists bool, err error) { + rowsAffected, err := s.insertTransactionStats(ctx, txn, aggregatedTs, serializedFingerprintID, &scopedStats) + + if err != nil { + return false /* alreadyExists */, err + } + + if rowsAffected == 0 { + return true /* alreadyExists */, nil /* err */ + } + + return false /* alreadyExists */, nil /* err */ + } + + readFn := func(ctx context.Context, txn *kv.Txn) error { + persistedData := roachpb.TransactionStatistics{} + err := s.fetchPersistedTransactionStats(ctx, txn, aggregatedTs, serializedFingerprintID, scopedStats.App, &persistedData) + if err != nil { + return err + } + + scopedStats.Stats.Add(&persistedData) + return nil + } + + updateFn := func(ctx context.Context, txn *kv.Txn) error { + return s.updateTransactionStats(ctx, txn, aggregatedTs, serializedFingerprintID, &scopedStats) + } + + err := s.doInsertElseDoUpdate(ctx, txn, insertFn, readFn, updateFn) + if err != nil { + return errors.Wrapf(err, "flushing transaction %d's statistics", key) + } + return nil + }) +} + +func (s *PersistedSQLStats) doFlushSingleStmtStats( + ctx context.Context, stats *roachpb.CollectedStatementStatistics, +) error { + return s.cfg.KvDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + // Explicitly copy the stats so that this closure is retryable. + scopedStats := *stats + + aggregatedTs := s.computeAggregatedTs() + serializedFingerprintID := sqlstatsutil.EncodeUint64ToBytes(uint64(scopedStats.ID)) + + insertFn := func(ctx context.Context, txn *kv.Txn) (alreadyExists bool, err error) { + rowsAffected, err := s.insertStatementStats(ctx, txn, aggregatedTs, serializedFingerprintID, &scopedStats) + + if err != nil { + return false /* alreadyExists */, err + } + + if rowsAffected == 0 { + return true /* alreadyExists */, nil /* err */ + } + + return false /* alreadyExists */, nil /* err */ + } + + readFn := func(ctx context.Context, txn *kv.Txn) error { + persistedData := roachpb.StatementStatistics{} + err := s.fetchPersistedStatementStats(ctx, txn, aggregatedTs, serializedFingerprintID, &scopedStats.Key, &persistedData) + if err != nil { + return err + } + + scopedStats.Stats.Add(&persistedData) + return nil + } + + updateFn := func(ctx context.Context, txn *kv.Txn) error { + return s.updateStatementStats(ctx, txn, aggregatedTs, serializedFingerprintID, &scopedStats) + } + + err := s.doInsertElseDoUpdate(ctx, txn, insertFn, readFn, updateFn) + if err != nil { + return errors.Wrapf(err, "flush statement %d's statistics", scopedStats.ID) + } + return nil + }) +} + +func (s *PersistedSQLStats) doInsertElseDoUpdate( + ctx context.Context, + txn *kv.Txn, + insertFn func(context.Context, *kv.Txn) (alreadyExists bool, err error), + readFn func(context.Context, *kv.Txn) error, + updateFn func(context.Context, *kv.Txn) error, +) error { + alreadyExists, err := insertFn(ctx, txn) + if err != nil { + return err + } + + if alreadyExists { + err = readFn(ctx, txn) + if err != nil { + return err + } + + err = updateFn(ctx, txn) + if err != nil { + return err + } + } + + return nil +} + +func (s *PersistedSQLStats) computeAggregatedTs() time.Time { + interval := SQLStatsFlushInterval.Get(&s.cfg.Settings.SV) + now := s.getTimeNow() + + aggTs := now.Truncate(interval) + + return aggTs +} + +func (s *PersistedSQLStats) getTimeNow() time.Time { + if s.cfg.Knobs != nil && s.cfg.Knobs.StubTimeNow != nil { + return s.cfg.Knobs.StubTimeNow() + } + + return timeutil.Now() +} + +func (s *PersistedSQLStats) insertTransactionStats( + ctx context.Context, + txn *kv.Txn, + aggregatedTs time.Time, + serializedFingerprintID []byte, + stats *roachpb.CollectedTransactionStatistics, +) (rowsAffected int, err error) { + insertStmt := ` +INSERT INTO system.transaction_statistics +VALUES ($1, $2, $3, $4, $5, $6, $7) +ON CONFLICT (crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_shard_8, aggregated_ts, fingerprint_id, app_name, node_id) +DO NOTHING +` + + aggInterval := SQLStatsFlushInterval.Get(&s.cfg.Settings.SV) + + // Prepare data for insertion. + metadataJSON, err := sqlstatsutil.BuildTxnMetadataJSON(stats) + if err != nil { + return 0 /* rowsAffected */, err + } + metadata := tree.NewDJSON(metadataJSON) + + statisticsJSON, err := sqlstatsutil.BuildTxnStatisticsJSON(stats) + if err != nil { + return 0 /* rowsAffected */, err + } + statistics := tree.NewDJSON(statisticsJSON) + + rowsAffected, err = s.cfg.InternalExecutor.ExecEx( + ctx, + "insert-txn-stats", + txn, /* txn */ + sessiondata.InternalExecutorOverride{ + User: security.NodeUserName(), + }, + insertStmt, + aggregatedTs, // aggregated_ts + serializedFingerprintID, // fingerprint_id + stats.App, // app_name + s.cfg.SQLIDContainer.SQLInstanceID(), // node_id + aggInterval, // agg_interval + metadata, // metadata + statistics, // statistics + ) + + return rowsAffected, err +} +func (s *PersistedSQLStats) updateTransactionStats( + ctx context.Context, + txn *kv.Txn, + aggregatedTs time.Time, + serializedFingerprintID []byte, + stats *roachpb.CollectedTransactionStatistics, +) error { + updateStmt := ` +UPDATE system.transaction_statistics +SET statistics = $1 +WHERE fingerprint_id = $2 + AND aggregated_ts = $3 + AND app_name = $4 + AND node_id = $5 +` + + statisticsJSON, err := sqlstatsutil.BuildTxnStatisticsJSON(stats) + if err != nil { + return err + } + statistics := tree.NewDJSON(statisticsJSON) + + rowsAffected, err := s.cfg.InternalExecutor.ExecEx( + ctx, + "update-stmt-stats", + txn, /* txn */ + sessiondata.InternalExecutorOverride{ + User: security.NodeUserName(), + }, + updateStmt, + statistics, // statistics + serializedFingerprintID, // fingerprint_id + aggregatedTs, // aggregated_ts + stats.App, // app_name + s.cfg.SQLIDContainer.SQLInstanceID(), // node_id + ) + + if err != nil { + return err + } + + if rowsAffected == 0 { + return errors.AssertionFailedf("failed to update transaction statistics for fingerprint_id: %s, app: %s, aggregated_ts: %s, node_id: %d", + serializedFingerprintID, stats.App, aggregatedTs, + s.cfg.SQLIDContainer.SQLInstanceID()) + } + + return nil +} + +func (s *PersistedSQLStats) updateStatementStats( + ctx context.Context, + txn *kv.Txn, + aggregatedTs time.Time, + serializedFingerprintID []byte, + stats *roachpb.CollectedStatementStatistics, +) error { + updateStmt := ` +UPDATE system.statement_statistics +SET statistics = $1 +WHERE fingerprint_id = $2 + AND aggregated_ts = $3 + AND app_name = $4 + AND plan_hash = $5 + AND node_id = $6 +` + + statisticsJSON, err := sqlstatsutil.BuildStmtStatisticsJSON(&stats.Stats) + if err != nil { + return err + } + statistics := tree.NewDJSON(statisticsJSON) + + rowsAffected, err := s.cfg.InternalExecutor.ExecEx( + ctx, + "update-stmt-stats", + txn, /* txn */ + sessiondata.InternalExecutorOverride{ + User: security.NodeUserName(), + }, + updateStmt, + statistics, // statistics + serializedFingerprintID, // fingerprint_id + aggregatedTs, // aggregated_ts + stats.Key.App, // app_name + dummyPlanHash, // plan_id + s.cfg.SQLIDContainer.SQLInstanceID(), // node_id + ) + + if err != nil { + return err + } + + if rowsAffected == 0 { + return errors.AssertionFailedf("failed to update statement statistics fo fingerprint_id: %s, app: %s, aggregated_ts: %s, plan_hash: %d, node_id: %d", + serializedFingerprintID, stats.Key.App, aggregatedTs, dummyPlanHash, + s.cfg.SQLIDContainer.SQLInstanceID()) + } + + return nil +} + +func (s *PersistedSQLStats) insertStatementStats( + ctx context.Context, + txn *kv.Txn, + aggregatedTs time.Time, + serializedFingerprintID []byte, + stats *roachpb.CollectedStatementStatistics, +) (rowsAffected int, err error) { + insertStmt := ` +INSERT INTO system.statement_statistics +VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) +ON CONFLICT (crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_plan_hash_shard_8, aggregated_ts, fingerprint_id, app_name, plan_hash, node_id) +DO NOTHING +` + aggInterval := SQLStatsFlushInterval.Get(&s.cfg.Settings.SV) + + // Prepare data for insertion. + metadataJSON, err := sqlstatsutil.BuildStmtMetadataJSON(stats) + if err != nil { + return 0 /* rowsAffected */, err + } + metadata := tree.NewDJSON(metadataJSON) + + statisticsJSON, err := sqlstatsutil.BuildStmtStatisticsJSON(&stats.Stats) + if err != nil { + return 0 /* rowsAffected */, err + } + statistics := tree.NewDJSON(statisticsJSON) + + plan := tree.NewDJSON(sqlstatsutil.ExplainTreePlanNodeToJSON(&stats.Stats.SensitiveInfo.MostRecentPlanDescription)) + + rowsAffected, err = s.cfg.InternalExecutor.ExecEx( + ctx, + "insert-stmt-stats", + txn, /* txn */ + sessiondata.InternalExecutorOverride{ + User: security.NodeUserName(), + }, + insertStmt, + aggregatedTs, // aggregated_ts + serializedFingerprintID, // fingerprint_id + dummyPlanHash, // plan_hash + stats.Key.App, // app_name + s.cfg.SQLIDContainer.SQLInstanceID(), // node_id + aggInterval, // agg_internal + metadata, // metadata + statistics, // statistics + plan, // plan + ) + + return rowsAffected, err +} + +func (s *PersistedSQLStats) fetchPersistedTransactionStats( + ctx context.Context, + txn *kv.Txn, + aggregatedTs time.Time, + serializedFingerprintID []byte, + appName string, + result *roachpb.TransactionStatistics, +) error { + // We use `SELECT ... FOR UPDATE` statement because we are going to perform + // and `UPDATE` on the stats for the given fingerprint later. + readStmt := ` +SELECT + statistics +FROM + system.transaction_statistics +WHERE fingerprint_id = $1 + AND app_name = $2 + AND aggregated_ts = $3 + AND node_id = $4 +FOR UPDATE +` + + row, err := s.cfg.InternalExecutor.QueryRowEx( + ctx, + "fetch-txn-stats", + txn, /* txn */ + sessiondata.InternalExecutorOverride{ + User: security.NodeUserName(), + }, + readStmt, // stmt + serializedFingerprintID, // fingerprint_id + appName, // app_name + aggregatedTs, // aggregated_ts + s.cfg.SQLIDContainer.SQLInstanceID(), // node_id + ) + + if err != nil { + return err + } + + if row == nil { + return errors.AssertionFailedf("transaction statistics not found for fingerprint_id: %s, app: %s, aggregated_ts: %s, node_id: %d", + serializedFingerprintID, appName, aggregatedTs, + s.cfg.SQLIDContainer.SQLInstanceID()) + } + + if len(row) != 1 { + return errors.AssertionFailedf("unexpectedly found %d returning columns for fingerprint_id: %s, app: %s, aggregated_ts: %s, node_id: %d", + len(row), serializedFingerprintID, appName, aggregatedTs, + s.cfg.SQLIDContainer.SQLInstanceID()) + } + + statistics := tree.MustBeDJSON(row[0]) + return sqlstatsutil.DecodeTxnStatsStatisticsJSON(statistics.JSON, result) +} + +func (s *PersistedSQLStats) fetchPersistedStatementStats( + ctx context.Context, + txn *kv.Txn, + aggregatedTs time.Time, + serializedFingerprintID []byte, + key *roachpb.StatementStatisticsKey, + result *roachpb.StatementStatistics, +) error { + readStmt := ` +SELECT + statistics +FROM + system.statement_statistics +WHERE fingerprint_id = $1 + AND app_name = $2 + AND aggregated_ts = $3 + AND plan_hash = $4 + AND node_id = $5 +FOR UPDATE +` + row, err := s.cfg.InternalExecutor.QueryRowEx( + ctx, + "fetch-stmt-stats", + txn, /* txn */ + sessiondata.InternalExecutorOverride{ + User: security.NodeUserName(), + }, + readStmt, // stmt + serializedFingerprintID, // fingerprint_id + key.App, // app_name + aggregatedTs, // aggregated_ts + dummyPlanHash, // plan_hash + s.cfg.SQLIDContainer.SQLInstanceID(), // node_id + ) + + if err != nil { + return err + } + + if row == nil { + return errors.AssertionFailedf( + "statement statistics not found fingerprint_id: %s, app: %s, aggregated_ts: %s, plan_hash: %d, node_id: %d", + serializedFingerprintID, key.App, aggregatedTs, dummyPlanHash, s.cfg.SQLIDContainer.SQLInstanceID()) + } + + if len(row) != 1 { + return errors.AssertionFailedf("unexpectedly found %d returning columns for fingerprint_id: %s, app: %s, aggregated_ts: %s, plan_hash %d, node_id: %d", + len(row), serializedFingerprintID, key.App, aggregatedTs, dummyPlanHash, + s.cfg.SQLIDContainer.SQLInstanceID()) + } + + statistics := tree.MustBeDJSON(row[0]) + + return sqlstatsutil.DecodeStmtStatsStatisticsJSON(statistics.JSON, result) +} diff --git a/pkg/sql/sqlstats/persistedsqlstats/flush_test.go b/pkg/sql/sqlstats/persistedsqlstats/flush_test.go new file mode 100644 index 000000000000..1e1a1de33737 --- /dev/null +++ b/pkg/sql/sqlstats/persistedsqlstats/flush_test.go @@ -0,0 +1,363 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package persistedsqlstats_test + +import ( + "context" + gosql "database/sql" + "fmt" + "net/url" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" + "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/stretchr/testify/require" +) + +type testCase struct { + query string + fingerprint string + count int64 +} + +func TestSQLStatsFlush(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testCases := []testCase{ + { + query: "SELECT 1", + fingerprint: "SELECT _", + count: 3, + }, + { + query: "SELECT 1, 2, 3", + fingerprint: "SELECT _, _, _", + count: 10, + }, + { + query: "SELECT 1, 1 WHERE 1 < 10", + fingerprint: "SELECT _, _ WHERE _ < _", + count: 7, + }, + } + + fakeTime := stubTime{ + aggInterval: time.Hour, + } + fakeTime.setTime(timeutil.Now()) + + testCluster := serverutils.StartNewTestCluster(t, 3 /* numNodes */, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + SQLStatsKnobs: &persistedsqlstats.TestingKnobs{ + StubTimeNow: fakeTime.StubTimeNow, + }, + }, + }, + }) + + ctx := context.Background() + defer testCluster.Stopper().Stop(ctx) + + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + firstServer := testCluster.Server(0 /* idx */) + secondServer := testCluster.Server(1 /* idx */) + + firstPgURL, firstServerConnCleanup := sqlutils.PGUrl( + t, firstServer.ServingSQLAddr(), "CreateConnections" /* prefix */, url.User(security.RootUser)) + defer firstServerConnCleanup() + + secondPgURL, secondServerConnCleanup := sqlutils.PGUrl( + t, secondServer.ServingSQLAddr(), "CreateConnections" /* prefix */, url.User(security.RootUser)) + defer secondServerConnCleanup() + + pgFirstSQLConn, err := gosql.Open("postgres", firstPgURL.String()) + require.NoError(t, err) + firstSQLConn := sqlutils.MakeSQLRunner(pgFirstSQLConn) + + pgSecondSQLConn, err := gosql.Open("postgres", secondPgURL.String()) + require.NoError(t, err) + secondSQLConn := sqlutils.MakeSQLRunner(pgSecondSQLConn) + + firstServerSQLStats := firstServer.SQLServer().(*sql.Server).GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats) + secondServerSQLStats := secondServer.SQLServer().(*sql.Server).GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats) + + defer func() { + err := pgFirstSQLConn.Close() + require.NoError(t, err) + err = pgSecondSQLConn.Close() + require.NoError(t, err) + }() + firstSQLConn.Exec(t, "SET application_name = 'flush_unit_test'") + secondSQLConn.Exec(t, "SET application_name = 'flush_unit_test'") + require.NoError(t, err) + + // Regular inserts. + { + for _, tc := range testCases { + for i := int64(0); i < tc.count; i++ { + firstSQLConn.Exec(t, tc.query) + } + } + + verifyInMemoryStatsCorrectness(t, testCases, firstServerSQLStats) + verifyInMemoryStatsEmpty(t, testCases, secondServerSQLStats) + + firstServerSQLStats.Flush(ctx) + secondServerSQLStats.Flush(ctx) + + verifyInMemoryStatsEmpty(t, testCases, firstServerSQLStats) + verifyInMemoryStatsEmpty(t, testCases, secondServerSQLStats) + + // For each test case, we verify that it's being properly inserted exactly + // once and it is exactly executed tc.count number of times. + for _, tc := range testCases { + verifyNumOfInsertedEntries(t, secondSQLConn, tc.fingerprint, firstServer.NodeID(), 1 /* expectedStmtEntryCnt */, 1 /* expectedTxnEntryCtn */) + verifyInsertedFingerprintExecCount(t, secondSQLConn, tc.fingerprint, fakeTime.getAggTimeTs(), firstServer.NodeID(), tc.count) + } + } + + // We insert the same data during the same aggregation window to ensure that + // no new entries will be created but the statistics is updated. + { + for i := range testCases { + // Increment the execution count. + testCases[i].count++ + for execCnt := int64(0); execCnt < testCases[i].count; execCnt++ { + firstSQLConn.Exec(t, testCases[i].query) + } + } + verifyInMemoryStatsCorrectness(t, testCases, firstServerSQLStats) + verifyInMemoryStatsEmpty(t, testCases, secondServerSQLStats) + + firstServerSQLStats.Flush(ctx) + secondServerSQLStats.Flush(ctx) + + verifyInMemoryStatsEmpty(t, testCases, firstServerSQLStats) + verifyInMemoryStatsEmpty(t, testCases, secondServerSQLStats) + + for _, tc := range testCases { + verifyNumOfInsertedEntries(t, secondSQLConn, tc.fingerprint, firstServer.NodeID(), 1 /* expectedStmtEntryCnt */, 1 /* expectedTxnEntryCtn */) + // The execution count is doubled here because we execute all of the + // statements here in the same aggregation interval. + verifyInsertedFingerprintExecCount(t, secondSQLConn, tc.fingerprint, fakeTime.getAggTimeTs(), firstServer.NodeID(), tc.count+tc.count-1 /* expectedCount */) + } + } + + // We change the time to be in a different aggregation window. + { + fakeTime.setTime(fakeTime.StubTimeNow().Add(time.Hour * 3)) + + for _, tc := range testCases { + for i := int64(0); i < tc.count; i++ { + firstSQLConn.Exec(t, tc.query) + } + } + verifyInMemoryStatsCorrectness(t, testCases, firstServerSQLStats) + verifyInMemoryStatsEmpty(t, testCases, secondServerSQLStats) + + firstServerSQLStats.Flush(ctx) + secondServerSQLStats.Flush(ctx) + + verifyInMemoryStatsEmpty(t, testCases, firstServerSQLStats) + verifyInMemoryStatsEmpty(t, testCases, secondServerSQLStats) + + for _, tc := range testCases { + // We expect exactly 2 entries since we are in a different aggregation window. + verifyNumOfInsertedEntries(t, secondSQLConn, tc.fingerprint, firstServer.NodeID(), 2 /* expectedStmtEntryCnt */, 2 /* expectedTxnEntryCtn */) + verifyInsertedFingerprintExecCount(t, secondSQLConn, tc.fingerprint, fakeTime.getAggTimeTs(), firstServer.NodeID(), tc.count) + } + } + + // We run queries in a different server and trigger the flush. + { + for _, tc := range testCases { + for i := int64(0); i < tc.count; i++ { + secondSQLConn.Exec(t, tc.query) + require.NoError(t, err) + } + } + verifyInMemoryStatsEmpty(t, testCases, firstServerSQLStats) + verifyInMemoryStatsCorrectness(t, testCases, secondServerSQLStats) + + firstServerSQLStats.Flush(ctx) + secondServerSQLStats.Flush(ctx) + + verifyInMemoryStatsEmpty(t, testCases, firstServerSQLStats) + verifyInMemoryStatsEmpty(t, testCases, secondServerSQLStats) + + // Ensure that we encode the correct node_id for the new entry and did not + // accidentally tamper the entries written by another server. + for _, tc := range testCases { + verifyNumOfInsertedEntries(t, firstSQLConn, tc.fingerprint, secondServer.NodeID(), 1 /* expectedStmtEntryCnt */, 1 /* expectedTxnEntryCtn */) + verifyInsertedFingerprintExecCount(t, firstSQLConn, tc.fingerprint, fakeTime.getAggTimeTs(), secondServer.NodeID(), tc.count) + verifyNumOfInsertedEntries(t, secondSQLConn, tc.fingerprint, firstServer.NodeID(), 2 /* expectedStmtEntryCnt */, 2 /* expectedTxnEntryCtn */) + verifyInsertedFingerprintExecCount(t, secondSQLConn, tc.fingerprint, fakeTime.getAggTimeTs(), firstServer.NodeID(), tc.count) + } + } +} + +type stubTime struct { + syncutil.RWMutex + t time.Time + aggInterval time.Duration +} + +func (s *stubTime) setTime(t time.Time) { + s.RWMutex.Lock() + defer s.RWMutex.Unlock() + s.t = t +} + +func (s *stubTime) getAggTimeTs() time.Time { + s.RWMutex.Lock() + defer s.RWMutex.Unlock() + return s.t.Truncate(s.aggInterval) +} + +// StubTimeNow implements the testing knob interface for persistedsqlstats.Provider. +func (s *stubTime) StubTimeNow() time.Time { + s.RWMutex.RLock() + defer s.RWMutex.RUnlock() + return s.t +} + +func verifyInsertedFingerprintExecCount( + t *testing.T, + sqlConn *sqlutils.SQLRunner, + fingerprint string, + ts time.Time, + nodeID roachpb.NodeID, + expectedCount int64, +) { + row := sqlConn.Query(t, + ` +SELECT + (S.statistics -> 'statistics' ->> 'cnt')::INT AS stmtCount, + (T.statistics -> 'statistics' ->> 'cnt')::INT AS txnCount +FROM + system.transaction_statistics T, + system.statement_statistics S +WHERE S.metadata ->> 'query' = $1 + AND T.aggregated_ts = $2 + AND T.node_id = $3 + AND T.app_name = 'flush_unit_test' + AND decode(T.metadata -> 'stmtFingerprintIDs' ->> 0, 'hex') = S.fingerprint_id + AND S.node_id = T.node_id + AND S.aggregated_ts = T.aggregated_ts + AND S.app_name = T.app_name +`, fingerprint, ts, nodeID) + + require.True(t, row.Next(), "no stats found for fingerprint: %s", fingerprint) + + var actualTxnExecCnt int64 + var actualStmtExecCnt int64 + err := row.Scan(&actualStmtExecCnt, &actualTxnExecCnt) + require.NoError(t, err) + require.Equal(t, expectedCount, actualStmtExecCnt, "fingerprint: %s", fingerprint) + require.Equal(t, expectedCount, actualTxnExecCnt, "fingerprint: %s", fingerprint) + require.False(t, row.Next(), "more than one rows found for fingerprint: %s", fingerprint) + require.NoError(t, row.Close()) +} + +func verifyNumOfInsertedEntries( + t *testing.T, + sqlConn *sqlutils.SQLRunner, + fingerprint string, + nodeID roachpb.NodeID, + expectedStmtEntryCnt, expectedTxnEntryCnt int64, +) { + row2 := sqlConn.DB.QueryRowContext(context.Background(), + ` +SELECT + encode(fingerprint_id, 'hex'), + count(*) +FROM + system.statement_statistics +WHERE + metadata ->> 'query' = $1 AND + node_id = $2 AND + app_name = 'flush_unit_test' +GROUP BY + (fingerprint_id, node_id) +`, fingerprint, nodeID) + + var stmtFingerprintID string + var numOfInsertedStmtEntry int64 + + e := row2.Scan(&stmtFingerprintID, &numOfInsertedStmtEntry) + require.NoError(t, e) + require.Equal(t, expectedStmtEntryCnt, numOfInsertedStmtEntry, "fingerprint: %s", fingerprint) + + row1 := sqlConn.DB.QueryRowContext(context.Background(), fmt.Sprintf( + ` +SELECT + count(*) +FROM + system.transaction_statistics +WHERE + (metadata -> 'stmtFingerprintIDs' ->> 0) = '%s' AND + node_id = $1 AND + app_name = 'flush_unit_test' +GROUP BY + (fingerprint_id, node_id) +`, stmtFingerprintID), nodeID) + + var numOfInsertedTxnEntry int64 + err := row1.Scan(&numOfInsertedTxnEntry) + require.NoError(t, err) + require.Equal(t, expectedTxnEntryCnt, numOfInsertedTxnEntry, "fingerprint: %s", fingerprint) +} + +func verifyInMemoryStatsCorrectness( + t *testing.T, tcs []testCase, statsProvider *persistedsqlstats.PersistedSQLStats, +) { + for _, tc := range tcs { + err := statsProvider.IterateStatementStats(context.Background(), &sqlstats.IteratorOptions{}, func(ctx context.Context, statistics *roachpb.CollectedStatementStatistics) error { + if tc.fingerprint == statistics.Key.Query { + require.Equal(t, tc.count, statistics.Stats.Count, "fingerprint: %s", tc.fingerprint) + } + return nil + }) + + require.NoError(t, err) + } +} + +func verifyInMemoryStatsEmpty( + t *testing.T, tcs []testCase, statsProvider *persistedsqlstats.PersistedSQLStats, +) { + for _, tc := range tcs { + err := statsProvider.IterateStatementStats(context.Background(), &sqlstats.IteratorOptions{}, func(ctx context.Context, statistics *roachpb.CollectedStatementStatistics) error { + if tc.fingerprint == statistics.Key.Query { + require.Equal(t, 0 /* expected */, statistics.Stats.Count, "fingerprint: %s", tc.fingerprint) + } + return nil + }) + + require.NoError(t, err) + } +} diff --git a/pkg/sql/sqlstats/persistedsqlstats/main_test.go b/pkg/sql/sqlstats/persistedsqlstats/main_test.go new file mode 100644 index 000000000000..054cb0471b64 --- /dev/null +++ b/pkg/sql/sqlstats/persistedsqlstats/main_test.go @@ -0,0 +1,29 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package persistedsqlstats_test + +import ( + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" +) + +func TestMain(m *testing.M) { + security.SetAssetLoader(securitytest.EmbeddedAssets) + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} diff --git a/pkg/sql/sqlstats/persistedsqlstats/provider.go b/pkg/sql/sqlstats/persistedsqlstats/provider.go new file mode 100644 index 000000000000..e64f7d8ddc37 --- /dev/null +++ b/pkg/sql/sqlstats/persistedsqlstats/provider.go @@ -0,0 +1,74 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +// +// persistedsqlstats is a subsystem that is responsible for flushing node-local +// in-memory stats into persisted system tables. + +package persistedsqlstats + +import ( + "context" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" + "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/sslocal" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" + "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/stop" +) + +// TODO(azhng): currently we do not have the ability to compute a hash for +// query plan. This is currently being worked on by the SQL Queries team. +// Once we are able get consistent hash value from a query plan, we should +// update this. +const dummyPlanHash = int64(0) + +// Config is a configuration struct for the persisted SQL stats subsystem. +type Config struct { + Settings *cluster.Settings + InternalExecutor sqlutil.InternalExecutor + KvDB *kv.DB + SQLIDContainer *base.SQLIDContainer + Knobs *TestingKnobs + FlushCounter *metric.Counter + FlushDuration *metric.Histogram + FailureCounter *metric.Counter +} + +// PersistedSQLStats is a sqlstats.Provider that wraps a node-local in-memory +// sslocal.SQLStats. It behaves similar to a sslocal.SQLStats. However, it +// periodically writes the in-memory SQL stats into system table for +// persistence. It also performs the flush operation if it detects memory +// pressure. +type PersistedSQLStats struct { + *sslocal.SQLStats + + cfg *Config + + lastFlushStarted time.Time +} + +var _ sqlstats.Provider = &PersistedSQLStats{} + +// New returns a new instance of the PersistedSQLStats. +func New(cfg *Config, memSQLStats *sslocal.SQLStats) *PersistedSQLStats { + return &PersistedSQLStats{ + SQLStats: memSQLStats, + cfg: cfg, + } +} + +// Start implements sqlstats.Provider interface. +func (s *PersistedSQLStats) Start(ctx context.Context, stopper *stop.Stopper) { + s.SQLStats.Start(ctx, stopper) +} diff --git a/pkg/sql/sqlstats/persistedsqlstats/test_utils.go b/pkg/sql/sqlstats/persistedsqlstats/test_utils.go new file mode 100644 index 000000000000..ed49d099059b --- /dev/null +++ b/pkg/sql/sqlstats/persistedsqlstats/test_utils.go @@ -0,0 +1,27 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package persistedsqlstats + +import "time" + +// TestingKnobs provides hooks and knobs for unit tests. +type TestingKnobs struct { + // OnStatsFlushFinished is a callback that is triggered when a single + // statistics object is flushed. + OnStatsFlushFinished func(error) + + // StubTimeNow allows tests to override the timeutil.Now() function used + // by the flush operation to calculate aggregated_ts timestamp. + StubTimeNow func() time.Time +} + +// ModuleTestingKnobs implements base.ModuleTestingKnobs interface. +func (*TestingKnobs) ModuleTestingKnobs() {} diff --git a/pkg/sql/sqlstats/sslocal/sql_stats.go b/pkg/sql/sqlstats/sslocal/sql_stats.go index f73822ac2361..57b1a174d68d 100644 --- a/pkg/sql/sqlstats/sslocal/sql_stats.go +++ b/pkg/sql/sqlstats/sslocal/sql_stats.go @@ -13,6 +13,7 @@ package sslocal import ( "context" "math" + "sync/atomic" "time" "github.com/cockroachdb/cockroach/pkg/settings" @@ -100,6 +101,21 @@ func newSQLStats( return s } +// GetTotalFingerprintCount returns total number of unique statement and +// transaction fingerprints stored in the currnet SQLStats. +func (s *SQLStats) GetTotalFingerprintCount() int64 { + return atomic.LoadInt64(&s.atomic.uniqueStmtFingerprintCount) + atomic.LoadInt64(&s.atomic.uniqueTxnFingerprintCount) +} + +// GetTotalFingerprintBytes returns the total amount of bytes currently +// allocated for storing statistics for both statement and transaction +// fingerprints. +func (s *SQLStats) GetTotalFingerprintBytes() int64 { + s.mu.Lock() + defer s.mu.Unlock() + return s.mu.mon.AllocBytes() +} + func (s *SQLStats) getStatsForApplication(appName string) *ssmemstorage.Container { s.mu.Lock() defer s.mu.Unlock() diff --git a/pkg/sql/sqlstats/sslocal/sslocal_provider.go b/pkg/sql/sqlstats/sslocal/sslocal_provider.go index 15ea964e8588..10d1affd533e 100644 --- a/pkg/sql/sqlstats/sslocal/sslocal_provider.go +++ b/pkg/sql/sqlstats/sslocal/sslocal_provider.go @@ -125,14 +125,14 @@ func (s *SQLStats) GetLastReset() time.Time { // IterateStatementStats implements sqlstats.Provider interface. func (s *SQLStats) IterateStatementStats( - _ context.Context, options *sqlstats.IteratorOptions, visitor sqlstats.StatementVisitor, + ctx context.Context, options *sqlstats.IteratorOptions, visitor sqlstats.StatementVisitor, ) error { appNames := s.getAppNames(options.SortedAppNames) for _, appName := range appNames { statsContainer := s.getStatsForApplication(appName) - err := statsContainer.IterateStatementStats(appName, options.SortedKey, visitor) + err := statsContainer.IterateStatementStats(ctx, appName, options.SortedKey, visitor) if err != nil { return fmt.Errorf("sql stats iteration abort: %s", err) } @@ -142,14 +142,14 @@ func (s *SQLStats) IterateStatementStats( // IterateTransactionStats implements sqlstats.Provider interface. func (s *SQLStats) IterateTransactionStats( - _ context.Context, options *sqlstats.IteratorOptions, visitor sqlstats.TransactionVisitor, + ctx context.Context, options *sqlstats.IteratorOptions, visitor sqlstats.TransactionVisitor, ) error { appNames := s.getAppNames(options.SortedAppNames) for _, appName := range appNames { statsContainer := s.getStatsForApplication(appName) - err := statsContainer.IterateTransactionStats(appName, options.SortedKey, visitor) + err := statsContainer.IterateTransactionStats(ctx, appName, options.SortedKey, visitor) if err != nil { return fmt.Errorf("sql stats iteration abort: %s", err) } diff --git a/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go b/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go index dcfeb62f48e4..3cda730e5d8b 100644 --- a/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go +++ b/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go @@ -128,7 +128,7 @@ func New( // IterateStatementStats iterates through the stored statement statistics // stored in this Container. func (s *Container) IterateStatementStats( - appName string, orderedKey bool, visitor sqlstats.StatementVisitor, + ctx context.Context, appName string, orderedKey bool, visitor sqlstats.StatementVisitor, ) error { var stmtKeys stmtList s.mu.Lock() @@ -176,7 +176,7 @@ func (s *Container) IterateStatementStats( Stats: data, } - err := visitor(&collectedStats) + err := visitor(ctx, &collectedStats) if err != nil { return fmt.Errorf("sql stats iteration abort: %s", err) } @@ -187,7 +187,7 @@ func (s *Container) IterateStatementStats( // IterateTransactionStats iterates through the stored transaction statistics // stored in this Container. func (s *Container) IterateTransactionStats( - appName string, orderedKey bool, visitor sqlstats.TransactionVisitor, + ctx context.Context, appName string, orderedKey bool, visitor sqlstats.TransactionVisitor, ) error { // Retrieve the transaction keys and optionally sort them. var txnKeys txnList @@ -221,7 +221,7 @@ func (s *Container) IterateTransactionStats( } txnStats.mu.Unlock() - err := visitor(txnKey, &collectedStats) + err := visitor(ctx, txnKey, &collectedStats) if err != nil { return fmt.Errorf("sql stats iteration abort: %s", err) } diff --git a/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go b/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go index f32419ed9ced..f5fab1b8ebcc 100644 --- a/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go +++ b/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go @@ -21,6 +21,21 @@ import ( "github.com/cockroachdb/errors" ) +var ( + // ErrMemoryPressure is returned from the Container when we have reached + // the memory limit allowed. + ErrMemoryPressure = errors.New("insufficient sql stats memory") + + // ErrFingerprintLimitReached is returned from the Container when we have + // more fingerprints than the limit specified in the cluster setting. + ErrFingerprintLimitReached = errors.New("sql stats fingerprint limit reached") + + // ErrExecStatsFingerprintFlushed is returned from the Container when the + // stats object for the fingerprint has been flushed to system table before + // the roachpb.ExecStats can be recorded. + ErrExecStatsFingerprintFlushed = errors.New("stmtStats flushed before execution stats can be recorded") +) + var _ sqlstats.Writer = &Container{} // RecordStatement implements sqlstats.Writer interface. @@ -59,7 +74,7 @@ func (s *Container) RecordStatement( // This means we have reached the limit of unique fingerprintstats. We don't // record anything and abort the operation. if throttled { - return stmtFingerprintID, errors.New("unique fingerprint limit has been reached") + return stmtFingerprintID, ErrFingerprintLimitReached } // This statement was below the latency threshold or sql stats aren't being @@ -118,7 +133,7 @@ func (s *Container) RecordStatement( // memory budget, delete the entry that we just created and report the error. if err := s.mu.acc.Grow(ctx, estimatedMemoryAllocBytes); err != nil { delete(s.mu.stmts, statementKey) - return stats.ID, err + return stats.ID, ErrMemoryPressure } } @@ -132,7 +147,7 @@ func (s *Container) RecordStatementExecStats( stmtStats, _, _, _, _ := s.getStatsForStmt(key.Query, key.ImplicitTxn, key.Database, key.Failed, false /* createIfNotExists */) if stmtStats == nil { - return errors.New("stmtStats flushed before execution stats can be recorded") + return ErrExecStatsFingerprintFlushed } stmtStats.recordExecStats(stats) return nil @@ -169,7 +184,7 @@ func (s *Container) RecordTransaction( stats, created, throttled := s.getStatsForTxnWithKey(key, value.StatementFingerprintIDs, true /* createIfNonexistent */) if throttled { - return errors.New("unique fingerprint limit has been reached") + return ErrFingerprintLimitReached } // Collect the per-transaction statistics. @@ -188,7 +203,7 @@ func (s *Container) RecordTransaction( if err := s.mu.acc.Grow(ctx, estimatedMemAllocBytes); err != nil { delete(s.mu.txns, key) s.mu.Unlock() - return err + return ErrMemoryPressure } s.mu.Unlock() } diff --git a/pkg/sql/sqlstats/ssprovider.go b/pkg/sql/sqlstats/ssprovider.go index 3619bceeb136..4146e56d5633 100644 --- a/pkg/sql/sqlstats/ssprovider.go +++ b/pkg/sql/sqlstats/ssprovider.go @@ -100,12 +100,12 @@ type IteratorOptions struct { // StatementVisitor is the callback that is invoked when caller iterate through // all statement statistics using IterateStatementStats(). If an error is // encountered when calling the visitor, the iteration is aborted. -type StatementVisitor func(*roachpb.CollectedStatementStatistics) error +type StatementVisitor func(context.Context, *roachpb.CollectedStatementStatistics) error // TransactionVisitor is the callback that is invoked when caller iterate through // all transaction statistics using IterateTransactionStats(). If an error is // encountered when calling the visitor, the iteration is aborted. -type TransactionVisitor func(roachpb.TransactionFingerprintID, *roachpb.CollectedTransactionStatistics) error +type TransactionVisitor func(context.Context, roachpb.TransactionFingerprintID, *roachpb.CollectedTransactionStatistics) error // AggregatedTransactionVisitor is the callback invoked when iterate through // transaction statistics collected at the application level using diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index 7de59398935f..8175641e65ab 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -1785,6 +1785,30 @@ var charts = []sectionDescription{ Title: "Number of internal fingerprint statistics being discarded", Metrics: []string{"sql.stats.discarded.current.internal"}, }, + { + Title: "Number of times SQL Stats are flushed to persistent storage", + Metrics: []string{"sql.stats.flush.count"}, + }, + { + Title: "Number of errors encountered when flushing SQL Stats", + Metrics: []string{"sql.stats.flush.error"}, + }, + { + Title: "Time took to complete SQL Stats flush", + Metrics: []string{"sql.stats.flush.duration"}, + }, + { + Title: "Number of times internal SQL Stats are flushed to persistent storage", + Metrics: []string{"sql.stats.flush.count.internal"}, + }, + { + Title: "Number of errors encountered when flushing internal SQL Stats", + Metrics: []string{"sql.stats.flush.error.internal"}, + }, + { + Title: "Time took to complete internal SQL Stats flush", + Metrics: []string{"sql.stats.flush.duration.internal"}, + }, }, }, {