Skip to content

Commit

Permalink
sql: measure CPU time spent during SQL execution
Browse files Browse the repository at this point in the history
This commit adds tracking for CPU time spent during SQL execution. The CPU
time is tracked at the operator granularity when statistics collection is
enabled, similar to execution time.

For now, the CPU time is only surfaced in the output of `EXPLAIN ANALYZE`
variants. A future PR will add support for logging this value in the
statement statistics.

Note that the main goroutines of the streamer, hash router, and outbox are
not currently tracked by this work. However, it is expected that these should
be relatively cheap, and shouldn't significantly impact the measurement.

Informs: cockroachdb#87213

Release note (sql change): CPU time spent during SQL execution is now visible
in the output of queries run with `EXPLAIN ANALYZE`. This measure does not
include CPU time spent while serving KV requests. This can be useful for
diagnosing performance issues and optimizing SQL queries.
  • Loading branch information
DrewKimball committed Dec 20, 2022
1 parent 48ef0d8 commit e308463
Show file tree
Hide file tree
Showing 14 changed files with 135 additions and 7 deletions.
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,7 @@ go_library(
"//pkg/util/errorutil/unimplemented",
"//pkg/util/fsm",
"//pkg/util/grpcutil",
"//pkg/util/grunning",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
"//pkg/util/interval",
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colflow/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ go_library(
"//pkg/util",
"//pkg/util/admission",
"//pkg/util/buildutil",
"//pkg/util/grunning",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/mon",
Expand Down
23 changes: 19 additions & 4 deletions pkg/sql/colflow/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/execstats"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/grunning"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand All @@ -32,6 +33,7 @@ import (
// colexecop.VectorizedStatsCollector's childStatsCollectors.
type childStatsCollector interface {
getElapsedTime() time.Duration
getElapsedCPUTime() time.Duration
}

// batchInfoCollector is a helper used by collector implementations.
Expand Down Expand Up @@ -139,31 +141,41 @@ func (bic *batchInfoCollector) Next() coldata.Batch {
// operator. ok indicates whether the stats collection was successful.
func (bic *batchInfoCollector) finishAndGetStats() (
numBatches, numTuples uint64,
time time.Duration,
time, cpuTime time.Duration,
ok bool,
) {
tm := bic.stopwatch.Elapsed()
cpuTm := bic.stopwatch.ElapsedCPU()
// Subtract the time spent in each of the child stats collectors, to produce
// the amount of time that the wrapped operator spent doing work itself, not
// including time spent waiting on its inputs.
for _, statsCollectors := range bic.childStatsCollectors {
tm -= statsCollectors.getElapsedTime()
cpuTm -= statsCollectors.getElapsedCPUTime()
}
if buildutil.CrdbTestBuild {
if tm < 0 {
colexecerror.InternalError(errors.AssertionFailedf("unexpectedly execution time is negative"))
}
if cpuTm < 0 {
colexecerror.InternalError(errors.AssertionFailedf("unexpectedly CPU time is negative"))
}
}
bic.mu.Lock()
defer bic.mu.Unlock()
return bic.mu.numBatches, bic.mu.numTuples, tm, bic.mu.initialized
return bic.mu.numBatches, bic.mu.numTuples, tm, cpuTm, bic.mu.initialized
}

// getElapsedTime implements the childStatsCollector interface.
func (bic *batchInfoCollector) getElapsedTime() time.Duration {
return bic.stopwatch.Elapsed()
}

// getElapsedCPUTime implements the childStatsCollector interface.
func (bic *batchInfoCollector) getElapsedCPUTime() time.Duration {
return bic.stopwatch.ElapsedCPU()
}

// newVectorizedStatsCollector creates a colexecop.VectorizedStatsCollector
// which wraps 'op' that corresponds to a component with either ProcessorID or
// StreamID 'id' (with 'idTagKey' distinguishing between the two). 'kvReader' is
Expand Down Expand Up @@ -203,7 +215,7 @@ type vectorizedStatsCollectorImpl struct {

// GetStats is part of the colexecop.VectorizedStatsCollector interface.
func (vsc *vectorizedStatsCollectorImpl) GetStats() *execinfrapb.ComponentStats {
numBatches, numTuples, time, ok := vsc.batchInfoCollector.finishAndGetStats()
numBatches, numTuples, time, cpuTime, ok := vsc.batchInfoCollector.finishAndGetStats()
if !ok {
// The stats collection wasn't successful for some reason, so we will
// return an empty object (since nil is not allowed by the contract of
Expand Down Expand Up @@ -255,6 +267,9 @@ func (vsc *vectorizedStatsCollectorImpl) GetStats() *execinfrapb.ComponentStats
} else {
s.Exec.ExecTime.Set(time)
}
if grunning.Supported() {
s.Exec.CPUTime.Set(cpuTime)
}

s.Output.NumBatches.Set(numBatches)
s.Output.NumTuples.Set(numTuples)
Expand Down Expand Up @@ -289,7 +304,7 @@ type networkVectorizedStatsCollectorImpl struct {

// GetStats is part of the colexecop.VectorizedStatsCollector interface.
func (nvsc *networkVectorizedStatsCollectorImpl) GetStats() *execinfrapb.ComponentStats {
numBatches, numTuples, time, ok := nvsc.batchInfoCollector.finishAndGetStats()
numBatches, numTuples, time, _, ok := nvsc.batchInfoCollector.finishAndGetStats()
if !ok {
// The stats collection wasn't successful for some reason, so we will
// return an empty object (since nil is not allowed by the contract of
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ func (s *vectorizedFlowCreator) wrapWithVectorizedStatsCollectorBase(
component execinfrapb.ComponentID,
monitors []*mon.BytesMonitor,
) error {
inputWatch := timeutil.NewStopWatch()
inputWatch := timeutil.NewStopWatchWithCPU()
var memMonitors, diskMonitors []*mon.BytesMonitor
for _, m := range monitors {
if m.Resource() == mon.DiskResource {
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/execinfrapb/component_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ func (s *ComponentStats) formatStats(fn func(suffix string, value interface{}))
if s.Exec.MaxAllocatedDisk.HasValue() {
fn("max sql temp disk usage", humanize.IBytes(s.Exec.MaxAllocatedDisk.Value()))
}
if s.Exec.CPUTime.HasValue() {
fn("sql cpu time", humanizeutil.Duration(s.Exec.CPUTime.Value()))
}

// Output stats.
if s.Output.NumBatches.HasValue() {
Expand Down Expand Up @@ -267,6 +270,9 @@ func (s *ComponentStats) Union(other *ComponentStats) *ComponentStats {
if !result.Exec.ConsumedRU.HasValue() {
result.Exec.ConsumedRU = other.Exec.ConsumedRU
}
if !result.Exec.CPUTime.HasValue() {
result.Exec.CPUTime = other.Exec.CPUTime
}

// Output stats.
if !result.Output.NumBatches.HasValue() {
Expand Down Expand Up @@ -360,6 +366,7 @@ func (s *ComponentStats) MakeDeterministic() {
resetUint(&s.Exec.MaxAllocatedMem)
resetUint(&s.Exec.MaxAllocatedDisk)
resetUint(&s.Exec.ConsumedRU)
timeVal(&s.Exec.CPUTime)

// Output.
resetUint(&s.Output.NumBatches)
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/execinfrapb/component_stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ message ExecStats {
// Maximum scratch disk allocated by the component.
optional util.optional.Uint max_allocated_disk = 3 [(gogoproto.nullable) = false];
// Amount of RUs consumed while executing the component.
optional util.optional.Uint consumed_r_u = 4 [(gogoproto.nullable) = false];
optional util.optional.Uint consumed_ru = 4 [(gogoproto.nullable) = false, (gogoproto.customname) = "ConsumedRU"];
// CPU time spent executing the component.
optional util.optional.Duration cpu_time = 5 [(gogoproto.nullable) = false, (gogoproto.customname) = "CPUTime"];
}

// OutputStats contains statistics about the output (results) of a component.
Expand All @@ -169,5 +171,5 @@ message FlowStats {
optional util.optional.Uint max_disk_usage = 2 [(gogoproto.nullable) = false];
// Amount of RUs consumed due to CPU usage while executing the flow. Currently
// only used for remote flows.
optional util.optional.Uint consumed_r_u = 3 [(gogoproto.nullable) = false];
optional util.optional.Uint consumed_ru = 3 [(gogoproto.nullable) = false, (gogoproto.customname) = "ConsumedRU"];
}
9 changes: 9 additions & 0 deletions pkg/sql/execstats/traceanalyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ type NodeLevelStats struct {
NetworkMessagesGroupedByNode map[base.SQLInstanceID]int64
ContentionTimeGroupedByNode map[base.SQLInstanceID]time.Duration
RUEstimateGroupedByNode map[base.SQLInstanceID]int64
CPUTimeGroupedByNode map[base.SQLInstanceID]time.Duration
}

// QueryLevelStats returns all the query level stats that correspond to the
Expand All @@ -127,6 +128,7 @@ type QueryLevelStats struct {
ContentionTime time.Duration
ContentionEvents []roachpb.ContentionEvent
RUEstimate int64
CPUTime time.Duration
}

// QueryLevelStatsWithErr is the same as QueryLevelStats, but also tracks
Expand Down Expand Up @@ -162,6 +164,7 @@ func (s *QueryLevelStats) Accumulate(other QueryLevelStats) {
s.ContentionTime += other.ContentionTime
s.ContentionEvents = append(s.ContentionEvents, other.ContentionEvents...)
s.RUEstimate += other.RUEstimate
s.CPUTime += other.CPUTime
}

// TraceAnalyzer is a struct that helps calculate top-level statistics from a
Expand Down Expand Up @@ -238,6 +241,7 @@ func (a *TraceAnalyzer) ProcessStats() error {
NetworkMessagesGroupedByNode: make(map[base.SQLInstanceID]int64),
ContentionTimeGroupedByNode: make(map[base.SQLInstanceID]time.Duration),
RUEstimateGroupedByNode: make(map[base.SQLInstanceID]int64),
CPUTimeGroupedByNode: make(map[base.SQLInstanceID]time.Duration),
}
var errs error

Expand All @@ -254,6 +258,7 @@ func (a *TraceAnalyzer) ProcessStats() error {
a.nodeLevelStats.KVTimeGroupedByNode[instanceID] += stats.KV.KVTime.Value()
a.nodeLevelStats.ContentionTimeGroupedByNode[instanceID] += stats.KV.ContentionTime.Value()
a.nodeLevelStats.RUEstimateGroupedByNode[instanceID] += int64(stats.Exec.ConsumedRU.Value())
a.nodeLevelStats.CPUTimeGroupedByNode[instanceID] += stats.Exec.CPUTime.Value()
allContentionEvents = append(allContentionEvents, stats.KV.ContentionEvents...)
}

Expand Down Expand Up @@ -374,6 +379,10 @@ func (a *TraceAnalyzer) ProcessStats() error {
a.queryLevelStats.RUEstimate += estimatedRU
}

for _, cpuTime := range a.nodeLevelStats.CPUTimeGroupedByNode {
a.queryLevelStats.CPUTime += cpuTime
}

a.queryLevelStats.ContentionEvents = allContentionEvents

return errs
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/grunning"
"github.com/cockroachdb/cockroach/pkg/util/intsets"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
Expand Down Expand Up @@ -552,6 +553,9 @@ func (ih *instrumentationHelper) emitExplainAnalyzePlanToOutputBuilder(
ob.AddMaxMemUsage(queryStats.MaxMemUsage)
ob.AddNetworkStats(queryStats.NetworkMessages, queryStats.NetworkBytesSent)
ob.AddMaxDiskUsage(queryStats.MaxDiskUsage)
if grunning.Supported() {
ob.AddCPUTime(queryStats.CPUTime)
}
if ih.isTenant && ih.outputMode != unmodifiedOutput && ih.vectorized {
// Only output RU estimate if this is a tenant running EXPLAIN ANALYZE.
// Additionally, RUs aren't correctly propagated in all cases for plans
Expand Down Expand Up @@ -693,6 +697,7 @@ func (m execNodeTraceMetadata) annotateExplain(
nodeStats.VectorizedBatchCount.MaybeAdd(stats.Output.NumBatches)
nodeStats.MaxAllocatedMem.MaybeAdd(stats.Exec.MaxAllocatedMem)
nodeStats.MaxAllocatedDisk.MaybeAdd(stats.Exec.MaxAllocatedDisk)
nodeStats.SQLCPUTime.MaybeAdd(stats.Exec.CPUTime)
}
// If we didn't get statistics for all processors, we don't show the
// incomplete results. In the future, we may consider an incomplete flag
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/opt/exec/explain/emit.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,9 @@ func (e *emitter) emitNodeAttributes(n *Node) error {
if s.MaxAllocatedDisk.HasValue() {
e.ob.AddField("estimated max sql temp disk usage", humanize.IBytes(s.MaxAllocatedDisk.Value()))
}
if s.SQLCPUTime.HasValue() {
e.ob.AddField("sql cpu time", string(humanizeutil.Duration(s.SQLCPUTime.Value())))
}
if e.ob.flags.Verbose {
if s.StepCount.HasValue() {
e.ob.AddField("MVCC step count (ext/int)", fmt.Sprintf("%s/%s",
Expand Down
10 changes: 10 additions & 0 deletions pkg/sql/opt/exec/explain/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,16 @@ func (ob *OutputBuilder) AddMaxDiskUsage(bytes int64) {
}
}

// AddCPUTime adds a top-level field for the cumulative cpu time spent by SQL
// execution.
func (ob *OutputBuilder) AddCPUTime(cpuTime time.Duration) {
ob.AddRedactableTopLevelField(
RedactVolatile,
"sql cpu time",
string(humanizeutil.Duration(cpuTime)),
)
}

// AddRUEstimate adds a top-level field for the estimated number of RUs consumed
// by the query.
func (ob *OutputBuilder) AddRUEstimate(ru int64) {
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/opt/exec/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ type ExecutionStats struct {

MaxAllocatedMem optional.Uint
MaxAllocatedDisk optional.Uint
SQLCPUTime optional.Duration

// Nodes on which this operator was executed.
Nodes []string
Expand Down
2 changes: 2 additions & 0 deletions pkg/util/timeutil/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "timeutil",
srcs = [
"cpustopwatch.go",
"manual_time.go",
"stopwatch.go",
"time.go",
Expand All @@ -17,6 +18,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/util/timeutil",
visibility = ["//visibility:public"],
deps = [
"//pkg/util/grunning",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
],
Expand Down
47 changes: 47 additions & 0 deletions pkg/util/timeutil/cpustopwatch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package timeutil

import (
"time"

"github.com/cockroachdb/cockroach/pkg/util/grunning"
)

// cpuStopWatch is a utility stop watch for measuring CPU time spent by a
// component. It can be safely started and stopped multiple times, but is
// not safe to use concurrently. If cpuStopWatch is nil, all operations are
// no-ops.
type cpuStopWatch struct {
startCPU time.Duration
totalCPU time.Duration
}

func (w *cpuStopWatch) start() {
if w == nil {
return
}
w.startCPU = grunning.Time()
}

func (w *cpuStopWatch) stop() {
if w == nil {
return
}
w.totalCPU += grunning.Difference(w.startCPU, grunning.Time())
}

func (w *cpuStopWatch) elapsed() time.Duration {
if w == nil {
return 0
}
return w.totalCPU
}
Loading

0 comments on commit e308463

Please sign in to comment.