Skip to content

Commit

Permalink
Merge #96396
Browse files Browse the repository at this point in the history
96396: sqlstats, sql: add latency info to statement statistics r=maryliag a=maryliag

This commits adds latency info in seconds to the statement
statistics on `crdb_internal.statement_statistics`,
`system.statement_statistics` and
`crdb_internal.cluster_statement_statistics`,
with information about: min, max, p50, p90 and p99.

It also adds to `crdb_internal.node_statement_statistics`
the columns: `latency_seconds_min`, `latency_seconds_max`, `latency_seconds_p50`,
` latency_seconds_p90` and `latency_seconds_p99`.

Part Of #72954

This initial version is leveraging the latency summary that
already existed on Insights. Since we were already collecting
that information for all statements that executed for at least
`AnomalyDetectionLatencyThreshold`, this commit makes that
mapping available so SQLStats can use it to retrieve the values.
This value is an estimate, since we're missing the executions
that took less than the 50ms (default).
We were also collecting just for p50, p90 and p99, so the
function that returns the values adds that limitation, in case
someone else was trying to use that function to retrieve other
percentiles.

A following task will focus on making this information more complete.

Example of statistics column on a `SELECT pg_sleep(x)` with various values of x between 0.1 and 3:
```
{
	"execution_statistics": {
                 ...
	},
	"index_recommendations": [],
	"statistics": {
                 ...
		"latencyInfo": {
			"max": 3.050540917,
			"min": 0.101273666,
			"p50": 0.600609374,
			"p90": 1.000914625,
			"p99": 3.001826416
		},
	}
}
```

Example of new columns on node table:
```
...
latency_seconds_min           | 0.201089583
latency_seconds_max           | 3.200554126
latency_seconds_p50           | 0.700803875
latency_seconds_p90           | 1.501616292
latency_seconds_p99           | 3.200554126
```

Release note (sql change): Add latency info (min, max, p50, p90, p99)
to crdb_internal.statement_statistics, system.statement_statistics and
crdb_internal.cluster_statement_statistics. Also adds `latency_seconds_min`,
`latency_seconds_max`, `latency_seconds_p50`, `latency_seconds_p90` and `latency_seconds_p99` to
crdb_internal.node_statement_statistics.

Co-authored-by: maryliag <[email protected]>
  • Loading branch information
craig[bot] and maryliag committed Feb 7, 2023
2 parents 8022f2a + 87156d2 commit f8a7f49
Show file tree
Hide file tree
Showing 23 changed files with 408 additions and 32 deletions.
4 changes: 2 additions & 2 deletions pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,10 @@ SELECT * FROM crdb_internal.leases WHERE node_id < 0
----
node_id table_id name parent_id expiration deleted

query ITTTTTIIITRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRBBTTTTT colnames
query ITTTTTIIITRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRBBTTTTTRRRRR colnames
SELECT * FROM crdb_internal.node_statement_statistics WHERE node_id < 0
----
node_id application_name flags statement_id key anonymized count first_attempt_count max_retries last_error rows_avg rows_var idle_lat_avg idle_lat_var parse_lat_avg parse_lat_var plan_lat_avg plan_lat_var run_lat_avg run_lat_var service_lat_avg service_lat_var overhead_lat_avg overhead_lat_var bytes_read_avg bytes_read_var rows_read_avg rows_read_var rows_written_avg rows_written_var network_bytes_avg network_bytes_var network_msgs_avg network_msgs_var max_mem_usage_avg max_mem_usage_var max_disk_usage_avg max_disk_usage_var contention_time_avg contention_time_var cpu_sql_nanos_avg cpu_sql_nanos_var implicit_txn full_scan sample_plan database_name exec_node_ids txn_fingerprint_id index_recommendations
node_id application_name flags statement_id key anonymized count first_attempt_count max_retries last_error rows_avg rows_var idle_lat_avg idle_lat_var parse_lat_avg parse_lat_var plan_lat_avg plan_lat_var run_lat_avg run_lat_var service_lat_avg service_lat_var overhead_lat_avg overhead_lat_var bytes_read_avg bytes_read_var rows_read_avg rows_read_var rows_written_avg rows_written_var network_bytes_avg network_bytes_var network_msgs_avg network_msgs_var max_mem_usage_avg max_mem_usage_var max_disk_usage_avg max_disk_usage_var contention_time_avg contention_time_var cpu_sql_nanos_avg cpu_sql_nanos_var implicit_txn full_scan sample_plan database_name exec_node_ids txn_fingerprint_id index_recommendations latency_seconds_min latency_seconds_max latency_seconds_p50 latency_seconds_p90 latency_seconds_p99

query ITTTIIRRRRRRRRRRRRRRRRRRRRRR colnames
SELECT * FROM crdb_internal.node_transaction_statistics WHERE node_id < 0
Expand Down
5 changes: 5 additions & 0 deletions pkg/cli/zip_table_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,11 @@ var zipInternalTablesPerNode = DebugZipTableRegistry{
"exec_node_ids",
"txn_fingerprint_id",
"index_recommendations",
"latency_seconds_min",
"latency_seconds_max",
"latency_seconds_p50",
"latency_seconds_p90",
"latency_seconds_p99",
},
},
"crdb_internal.node_transaction_statistics": {
Expand Down
18 changes: 18 additions & 0 deletions pkg/sql/appstatspb/app_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ func (s *StatementStatistics) Add(other *StatementStatistics) {
s.Indexes = util.CombineUniqueString(s.Indexes, other.Indexes)

s.ExecStats.Add(other.ExecStats)
s.LatencyInfo.Add(other.LatencyInfo)

if other.SensitiveInfo.LastErr != "" {
s.SensitiveInfo.LastErr = other.SensitiveInfo.LastErr
Expand Down Expand Up @@ -217,3 +218,20 @@ func (s *ExecStats) Add(other ExecStats) {

s.Count += other.Count
}

// Add combines other into this LatencyInfo.
func (s *LatencyInfo) Add(other LatencyInfo) {
// Use the latest non-zero value.
if other.P50 != 0 {
s.P50 = other.P50
s.P90 = other.P90
s.P99 = other.P99
}

if s.Min == 0 || other.Min < s.Min {
s.Min = other.Min
}
if other.Max > s.Max {
s.Max = other.Max
}
}
27 changes: 24 additions & 3 deletions pkg/sql/appstatspb/app_stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -108,18 +108,21 @@ message StatementStatistics {
// Nodes is the ordered list of nodes ids on which the statement was executed.
repeated int64 nodes = 24;

// plan_gists is the list of a compressed version of plan that can be converted (lossily)
// PlanGists is the list of a compressed version of plan that can be converted (lossily)
// back into a logical plan.
// Each statement contain only one plan gist, but the same statement fingerprint id
// can contain more than one value.
repeated string plan_gists = 26;

// index_recommendations is the list of index recommendations generated for the statement fingerprint.
// IndexRecommendations is the list of index recommendations generated for the statement fingerprint.
repeated string index_recommendations = 27;

// indexes is the list of indexes used by the particular plan when executing the statement.
// Indexes is the list of indexes used by the particular plan when executing the statement.
repeated string indexes = 30;

// LatencyInfo is the information about latency, such min, max, p50, p90 and p99.
optional LatencyInfo latency_info = 31 [(gogoproto.nullable) = false];

// Note: be sure to update `sql/app_stats.go` when adding/removing fields here!

reserved 13, 14, 17, 18, 19, 20;
Expand Down Expand Up @@ -338,3 +341,21 @@ message ExecStats {
// Note: be sure to update `sql/app_stats.go` when adding/removing fields
// here!
}

// LatencyInfo contains more details about the latency.
message LatencyInfo {
// Min is the minimum time in seconds spent executing the fingerprint.
optional double min = 1 [(gogoproto.nullable) = false];

// Max is the maximum time in seconds spent executing the fingerprint.
optional double max = 2 [(gogoproto.nullable) = false];

// P50 is the 50 Percentile in seconds for the fingerprint.
optional double p50 = 3 [(gogoproto.nullable) = false];

// P90 is the 90 Percentile in seconds for the fingerprint.
optional double p90 = 4 [(gogoproto.nullable) = false];

// P99 is the 99 Percentile in seconds for the fingerprint.
optional double p99 = 5 [(gogoproto.nullable) = false];
}
2 changes: 2 additions & 0 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ func NewServer(cfg *ExecutorConfig, pool *mon.BytesMonitor) *Server {
pool,
nil, /* reportedProvider */
cfg.SQLStatsTestingKnobs,
insightsProvider.LatencyInformation(),
)
reportedSQLStatsController := reportedSQLStats.GetController(cfg.SQLStatusServer)
memSQLStats := sslocal.New(
Expand All @@ -386,6 +387,7 @@ func NewServer(cfg *ExecutorConfig, pool *mon.BytesMonitor) *Server {
pool,
reportedSQLStats,
cfg.SQLStatsTestingKnobs,
insightsProvider.LatencyInformation(),
)
s := &Server{
cfg: cfg,
Expand Down
12 changes: 11 additions & 1 deletion pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -1375,7 +1375,12 @@ CREATE TABLE crdb_internal.node_statement_statistics (
database_name STRING NOT NULL,
exec_node_ids INT[] NOT NULL,
txn_fingerprint_id STRING,
index_recommendations STRING[] NOT NULL
index_recommendations STRING[] NOT NULL,
latency_seconds_min FLOAT,
latency_seconds_max FLOAT,
latency_seconds_p50 FLOAT,
latency_seconds_p90 FLOAT,
latency_seconds_p99 FLOAT
)`,
populate: func(ctx context.Context, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) error {
hasViewActivityOrViewActivityRedacted, err := p.HasViewActivityOrViewActivityRedactedRole(ctx)
Expand Down Expand Up @@ -1485,6 +1490,11 @@ CREATE TABLE crdb_internal.node_statement_statistics (
execNodeIDs, // exec_node_ids
txnFingerprintID, // txn_fingerprint_id
indexRecommendations, // index_recommendations
tree.NewDFloat(tree.DFloat(stats.Stats.LatencyInfo.Min)), // latency_seconds_min
tree.NewDFloat(tree.DFloat(stats.Stats.LatencyInfo.Max)), // latency_seconds_max
tree.NewDFloat(tree.DFloat(stats.Stats.LatencyInfo.P50)), // latency_seconds_p50
tree.NewDFloat(tree.DFloat(stats.Stats.LatencyInfo.P90)), // latency_seconds_p90
tree.NewDFloat(tree.DFloat(stats.Stats.LatencyInfo.P99)), // latency_seconds_p99
)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/logictest/testdata/logic_test/crdb_internal
Original file line number Diff line number Diff line change
Expand Up @@ -326,10 +326,10 @@ SELECT * FROM crdb_internal.leases WHERE node_id < 0
----
node_id table_id name parent_id expiration deleted

query ITTTTTIIITRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRBBTTTTT colnames
query ITTTTTIIITRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRBBTTTTTRRRRR colnames
SELECT * FROM crdb_internal.node_statement_statistics WHERE node_id < 0
----
node_id application_name flags statement_id key anonymized count first_attempt_count max_retries last_error rows_avg rows_var idle_lat_avg idle_lat_var parse_lat_avg parse_lat_var plan_lat_avg plan_lat_var run_lat_avg run_lat_var service_lat_avg service_lat_var overhead_lat_avg overhead_lat_var bytes_read_avg bytes_read_var rows_read_avg rows_read_var rows_written_avg rows_written_var network_bytes_avg network_bytes_var network_msgs_avg network_msgs_var max_mem_usage_avg max_mem_usage_var max_disk_usage_avg max_disk_usage_var contention_time_avg contention_time_var cpu_sql_nanos_avg cpu_sql_nanos_var implicit_txn full_scan sample_plan database_name exec_node_ids txn_fingerprint_id index_recommendations
node_id application_name flags statement_id key anonymized count first_attempt_count max_retries last_error rows_avg rows_var idle_lat_avg idle_lat_var parse_lat_avg parse_lat_var plan_lat_avg plan_lat_var run_lat_avg run_lat_var service_lat_avg service_lat_var overhead_lat_avg overhead_lat_var bytes_read_avg bytes_read_var rows_read_avg rows_read_var rows_written_avg rows_written_var network_bytes_avg network_bytes_var network_msgs_avg network_msgs_var max_mem_usage_avg max_mem_usage_var max_disk_usage_avg max_disk_usage_var contention_time_avg contention_time_var cpu_sql_nanos_avg cpu_sql_nanos_var implicit_txn full_scan sample_plan database_name exec_node_ids txn_fingerprint_id index_recommendations latency_seconds_min latency_seconds_max latency_seconds_p50 latency_seconds_p90 latency_seconds_p99

query ITTTIIRRRRRRRRRRRRRRRRRRRRRR colnames
SELECT * FROM crdb_internal.node_transaction_statistics WHERE node_id < 0
Expand Down

Large diffs are not rendered by default.

38 changes: 31 additions & 7 deletions pkg/sql/sqlstats/insights/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/appstatspb"
"github.com/cockroachdb/cockroach/pkg/util/quantile"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

type detector interface {
Expand Down Expand Up @@ -50,13 +51,17 @@ func (d *compositeDetector) isSlow(statement *Statement) bool {
return result
}

var desiredQuantiles = map[float64]float64{0.5: 0.05, 0.99: 0.001}
var desiredQuantiles = map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}

type anomalyDetector struct {
settings *cluster.Settings
metrics Metrics
store *list.List
index map[appstatspb.StmtFingerprintID]*list.Element
mu struct {
syncutil.RWMutex

index map[appstatspb.StmtFingerprintID]*list.Element
}
}

type latencySummaryEntry struct {
Expand Down Expand Up @@ -85,20 +90,37 @@ func (d *anomalyDetector) isSlow(stmt *Statement) (decision bool) {
return
}

func (d *anomalyDetector) GetPercentileValues(id appstatspb.StmtFingerprintID) PercentileValues {
d.mu.RLock()
defer d.mu.RUnlock()
latencies := PercentileValues{}
if entry, ok := d.mu.index[id]; ok {
latencySummary := entry.Value.(latencySummaryEntry).value
// If more percentiles are added, update the value of `desiredQuantiles` above
// to include the new keys.
latencies.P50 = latencySummary.Query(0.5)
latencies.P90 = latencySummary.Query(0.9)
latencies.P99 = latencySummary.Query(0.99)
}
return latencies
}

func (d *anomalyDetector) withFingerprintLatencySummary(
stmt *Statement, consumer func(latencySummary *quantile.Stream),
) {
d.mu.Lock()
defer d.mu.Unlock()
var latencySummary *quantile.Stream

if element, ok := d.index[stmt.FingerprintID]; ok {
if element, ok := d.mu.index[stmt.FingerprintID]; ok {
// We are already tracking latencies for this fingerprint.
latencySummary = element.Value.(latencySummaryEntry).value
d.store.MoveToFront(element) // Mark this latency summary as recently used.
} else if stmt.LatencyInSeconds >= AnomalyDetectionLatencyThreshold.Get(&d.settings.SV).Seconds() {
// We want to start tracking latencies for this fingerprint.
latencySummary = quantile.NewTargeted(desiredQuantiles)
entry := latencySummaryEntry{key: stmt.FingerprintID, value: latencySummary}
d.index[stmt.FingerprintID] = d.store.PushFront(entry)
d.mu.index[stmt.FingerprintID] = d.store.PushFront(entry)
d.metrics.Fingerprints.Inc(1)
d.metrics.Memory.Inc(latencySummary.ByteSize())
} else {
Expand All @@ -114,20 +136,22 @@ func (d *anomalyDetector) withFingerprintLatencySummary(
if d.metrics.Memory.Value() > AnomalyDetectionMemoryLimit.Get(&d.settings.SV) {
element := d.store.Back()
entry := d.store.Remove(element).(latencySummaryEntry)
delete(d.index, entry.key)
delete(d.mu.index, entry.key)
d.metrics.Evictions.Inc(1)
d.metrics.Fingerprints.Dec(1)
d.metrics.Memory.Dec(entry.value.ByteSize())
}
}

func newAnomalyDetector(settings *cluster.Settings, metrics Metrics) *anomalyDetector {
return &anomalyDetector{
anomaly := &anomalyDetector{
settings: settings,
metrics: metrics,
store: list.New(),
index: make(map[appstatspb.StmtFingerprintID]*list.Element),
}
anomaly.mu.index = make(map[appstatspb.StmtFingerprintID]*list.Element)

return anomaly
}

type latencyThresholdDetector struct {
Expand Down
19 changes: 18 additions & 1 deletion pkg/sql/sqlstats/insights/insights.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/appstatspb"
"github.com/cockroachdb/cockroach/pkg/sql/clusterunique"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/stop"
Expand Down Expand Up @@ -153,6 +154,16 @@ type Reader interface {
IterateInsights(context.Context, func(context.Context, *Insight))
}

type LatencyInformation interface {
GetPercentileValues(fingerprintID appstatspb.StmtFingerprintID) PercentileValues
}

type PercentileValues struct {
P50 float64
P90 float64
P99 float64
}

// Provider offers access to the insights subsystem.
type Provider interface {
// Start launches the background tasks necessary for processing insights.
Expand All @@ -164,21 +175,27 @@ type Provider interface {

// Reader returns an object that offers read access to any detected insights.
Reader() Reader

// LatencyInformation returns an object that offers read access to latency information,
// such as percentiles.
LatencyInformation() LatencyInformation
}

// New builds a new Provider.
func New(st *cluster.Settings, metrics Metrics) Provider {
store := newStore(st)
anomalyDetector := newAnomalyDetector(st, metrics)

return &defaultProvider{
store: store,
ingester: newConcurrentBufferIngester(
newRegistry(st, &compositeDetector{detectors: []detector{
&latencyThresholdDetector{st: st},
newAnomalyDetector(st, metrics),
anomalyDetector,
}}, &compositeSink{sinks: []sink{
store,
}}),
),
anomalyDetector: anomalyDetector,
}
}
9 changes: 7 additions & 2 deletions pkg/sql/sqlstats/insights/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ import (
)

type defaultProvider struct {
store *lockingStore
ingester *concurrentBufferIngester
store *lockingStore
ingester *concurrentBufferIngester
anomalyDetector *anomalyDetector
}

var _ Provider = &defaultProvider{}
Expand All @@ -40,6 +41,10 @@ func (p *defaultProvider) Reader() Reader {
return p.store
}

func (p *defaultProvider) LatencyInformation() LatencyInformation {
return p.anomalyDetector
}

type nullWriter struct{}

func (n *nullWriter) ObserveStatement(_ clusterunique.ID, _ *Statement) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,14 @@ func TestSQLStatsJsonEncoding(t *testing.T) {
},
"nodes": [{{joinInts .IntArray}}],
"planGists": [{{joinStrings .StringArray}}],
"indexes": [{{joinStrings .StringArray}}]
"indexes": [{{joinStrings .StringArray}}],
"latencyInfo": {
"min": {{.Float}},
"max": {{.Float}},
"p50": {{.Float}},
"p90": {{.Float}},
"p99": {{.Float}}
}
},
"execution_statistics": {
"cnt": {{.Int64}},
Expand Down Expand Up @@ -224,8 +231,15 @@ func TestSQLStatsJsonEncoding(t *testing.T) {
"mean": {{.Float}},
"sqDiff": {{.Float}}
},
"nodes": [{{joinInts .IntArray}}]
"planGists": [{{joinStrings .StringArray}}]
"nodes": [{{joinInts .IntArray}}],
"planGists": [{{joinStrings .StringArray}}],
"latencyInfo": {
"min": {{.Float}},
"max": {{.Float}},
"p50": {{.Float}},
"p90": {{.Float}},
"p99": {{.Float}},
}
},
"execution_statistics": {
"cnt": {{.Int64}},
Expand Down
Loading

0 comments on commit f8a7f49

Please sign in to comment.