Skip to content

Commit

Permalink
sql: measure CPU time spent during SQL execution
Browse files Browse the repository at this point in the history
This commit adds tracking for CPU time spent during SQL execution. The CPU
time is tracked at the operator granularity when statistics collection is
enabled, similar to execution time.

For now, the CPU time is only surfaced in the output of `EXPLAIN ANALYZE`
variants. A future PR will add support for logging this value in the
statement statistics.

Note that the main goroutines of the streamer, hash router, and outbox are
not currently tracked by this work. However, it is expected that these should
be relatively cheap, and shouldn't significantly impact the measurement.

Additionally, KV work is performed on a SQL goroutine in some cases
(e.g. when there is a single-range request for a local range). This makes
it necessary to track CPU time spent fulfilling KV requests on a SQL
goroutine so it can be subtracted from the total measured CPU time.
This logic is handled by the `kvBatchFetcherHelper` for the operators
that only perform reads (e.g. lookup joins and scans).

Because mutations do not record stats, they currently have no way to
differentiate KV CPU time from SQL CPU time. For this reason, a plan that
contains mutations will not output CPU time.

Informs: cockroachdb#87213

Release note (sql change): CPU time spent during SQL execution is now visible
in the output of queries run with `EXPLAIN ANALYZE`. This measure does not
include CPU time spent while serving KV requests, and CPU time is not shown for
queries that perform mutations. This can be useful for diagnosing performance
issues and optimizing SQL queries.
  • Loading branch information
DrewKimball committed Jan 9, 2023
1 parent c2ef4c8 commit 2880abb
Show file tree
Hide file tree
Showing 27 changed files with 314 additions and 25 deletions.
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ go_library(
"//pkg/util/errorutil/unimplemented",
"//pkg/util/fsm",
"//pkg/util/grpcutil",
"//pkg/util/grunning",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
"//pkg/util/interval",
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/colexecop/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ type KVReader interface {
// GetScanStats returns statistics about the scan that happened during the
// KV reads. It must be safe for concurrent use.
GetScanStats() execstats.ScanStats
// GetKVCPUTime returns the CPU time consumed *on the current goroutine* by
// KV requests. It must be safe for concurrent use. It is used to calculate
// the SQL CPU time.
GetKVCPUTime() time.Duration
}

// ZeroInputNode is an execopnode.OpNode with no inputs.
Expand Down
28 changes: 20 additions & 8 deletions pkg/sql/colfetcher/cfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"sort"
"strings"
"sync"
"time"

"github.com/cockroachdb/apd/v3"
"github.com/cockroachdb/cockroach/pkg/col/coldata"
Expand Down Expand Up @@ -220,15 +221,18 @@ type cFetcher struct {
nextKVer storage.NextKVer
// fetcher, if set, is the same object as nextKVer.
fetcher *row.KVFetcher
// bytesRead and batchRequestsIssued store the total number of bytes read
// and of BatchRequests issued, respectively, by this cFetcher throughout
// its lifetime in case when the underlying row.KVFetcher has already been
// closed and nil-ed out.
//
// The fields should not be accessed directly by the users of the cFetcher -
// getBytesRead() and getBatchRequestsIssued() should be used instead.
bytesRead int64
// bytesRead stores the total number of bytes read by this cFetcher throughout
// its lifetime after the underlying row.KVFetcher has been closed and nil-ed
// out. It should only be accessed through getBytesRead().
bytesRead int64
// batchRequestsIssued stores the total number of BatchRequests issued by this
// cFetcher after the underlying row.KVFetcher has been closed and nil-ed out.
// It can only be accessed through getBatchRequestsIssued().
batchRequestsIssued int64
// kvCPUTime stores the CPU time spent by this cFetcher while fulfilling KV
// requests *in the current goroutine* after the underlying row.KVFetcher has
// been closed and nil-ed out. It can only be accessed through getKVCPUTime().
kvCPUTime time.Duration

// machine contains fields that get updated during the run of the fetcher.
machine struct {
Expand Down Expand Up @@ -1292,6 +1296,13 @@ func (cf *cFetcher) getBytesRead() int64 {
return cf.bytesRead
}

func (cf *cFetcher) getKVCPUTime() time.Duration {
if cf.fetcher != nil {
return cf.fetcher.GetKVCPUTime()
}
return cf.kvCPUTime
}

// getBatchRequestsIssued returns the number of BatchRequests issued by the
// cFetcher throughout its lifetime so far.
func (cf *cFetcher) getBatchRequestsIssued() int64 {
Expand Down Expand Up @@ -1327,6 +1338,7 @@ func (cf *cFetcher) Close(ctx context.Context) {
if cf.fetcher != nil {
cf.bytesRead = cf.fetcher.GetBytesRead()
cf.batchRequestsIssued = cf.fetcher.GetBatchRequestsIssued()
cf.kvCPUTime = cf.fetcher.GetKVCPUTime()
cf.fetcher.Close(ctx)
cf.fetcher = nil
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/colfetcher/colbatch_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,13 @@ func (s *ColBatchScan) GetScanStats() execstats.ScanStats {
return execstats.GetScanStats(s.Ctx, nil /* recording */)
}

// GetKVCPUTime is part of the colexecop.KVReader interface.
func (s *ColBatchScan) GetKVCPUTime() time.Duration {
s.mu.Lock()
defer s.mu.Unlock()
return s.cf.getKVCPUTime()
}

var colBatchScanPool = sync.Pool{
New: func() interface{} {
return &ColBatchScan{}
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/colfetcher/index_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,13 @@ func (s *ColIndexJoin) GetBatchRequestsIssued() int64 {
return s.cf.getBatchRequestsIssued()
}

// GetKVCPUTime is part of the colexecop.KVReader interface.
func (s *ColIndexJoin) GetKVCPUTime() time.Duration {
s.mu.Lock()
defer s.mu.Unlock()
return s.cf.getKVCPUTime()
}

// GetContentionInfo is part of the colexecop.KVReader interface.
func (s *ColIndexJoin) GetContentionInfo() (time.Duration, []roachpb.ContentionEvent) {
return execstats.GetCumulativeContentionTime(s.Ctx, nil /* recording */)
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colflow/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ go_library(
"//pkg/util",
"//pkg/util/admission",
"//pkg/util/buildutil",
"//pkg/util/grunning",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/mon",
Expand Down
33 changes: 29 additions & 4 deletions pkg/sql/colflow/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/execstats"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/grunning"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand All @@ -32,6 +33,7 @@ import (
// colexecop.VectorizedStatsCollector's childStatsCollectors.
type childStatsCollector interface {
getElapsedTime() time.Duration
getElapsedCPUTime() time.Duration
}

// batchInfoCollector is a helper used by collector implementations.
Expand Down Expand Up @@ -139,31 +141,41 @@ func (bic *batchInfoCollector) Next() coldata.Batch {
// operator. ok indicates whether the stats collection was successful.
func (bic *batchInfoCollector) finishAndGetStats() (
numBatches, numTuples uint64,
time time.Duration,
time, cpuTime time.Duration,
ok bool,
) {
tm := bic.stopwatch.Elapsed()
cpuTm := bic.stopwatch.ElapsedCPU()
// Subtract the time spent in each of the child stats collectors, to produce
// the amount of time that the wrapped operator spent doing work itself, not
// including time spent waiting on its inputs.
for _, statsCollectors := range bic.childStatsCollectors {
tm -= statsCollectors.getElapsedTime()
cpuTm -= statsCollectors.getElapsedCPUTime()
}
if buildutil.CrdbTestBuild {
if tm < 0 {
colexecerror.InternalError(errors.AssertionFailedf("unexpectedly execution time is negative"))
}
if cpuTm < 0 {
colexecerror.InternalError(errors.AssertionFailedf("unexpectedly CPU time is negative"))
}
}
bic.mu.Lock()
defer bic.mu.Unlock()
return bic.mu.numBatches, bic.mu.numTuples, tm, bic.mu.initialized
return bic.mu.numBatches, bic.mu.numTuples, tm, cpuTm, bic.mu.initialized
}

// getElapsedTime implements the childStatsCollector interface.
func (bic *batchInfoCollector) getElapsedTime() time.Duration {
return bic.stopwatch.Elapsed()
}

// getElapsedCPUTime implements the childStatsCollector interface.
func (bic *batchInfoCollector) getElapsedCPUTime() time.Duration {
return bic.stopwatch.ElapsedCPU()
}

// newVectorizedStatsCollector creates a colexecop.VectorizedStatsCollector
// which wraps 'op' that corresponds to a component with either ProcessorID or
// StreamID 'id' (with 'idTagKey' distinguishing between the two). 'kvReader' is
Expand Down Expand Up @@ -203,7 +215,7 @@ type vectorizedStatsCollectorImpl struct {

// GetStats is part of the colexecop.VectorizedStatsCollector interface.
func (vsc *vectorizedStatsCollectorImpl) GetStats() *execinfrapb.ComponentStats {
numBatches, numTuples, time, ok := vsc.batchInfoCollector.finishAndGetStats()
numBatches, numTuples, time, cpuTime, ok := vsc.batchInfoCollector.finishAndGetStats()
if !ok {
// The stats collection wasn't successful for some reason, so we will
// return an empty object (since nil is not allowed by the contract of
Expand All @@ -221,6 +233,12 @@ func (vsc *vectorizedStatsCollectorImpl) GetStats() *execinfrapb.ComponentStats
var s *execinfrapb.ComponentStats
if vsc.columnarizer != nil {
s = vsc.columnarizer.GetStats()

// If the columnarizer is wrapping an operator that performs KV operations,
// we must subtract the CPU time spent performing KV work on a SQL goroutine
// from the measured CPU time. If the wrapped operator does not perform KV
// operations, this value will be zero.
cpuTime -= s.KV.KVCPUTime.Value()
} else {
// There was no root columnarizer, so create a new stats object.
s = &execinfrapb.ComponentStats{Component: vsc.componentID}
Expand Down Expand Up @@ -252,9 +270,16 @@ func (vsc *vectorizedStatsCollectorImpl) GetStats() *execinfrapb.ComponentStats
scanStats := vsc.kvReader.GetScanStats()
execstats.PopulateKVMVCCStats(&s.KV, &scanStats)
s.Exec.ConsumedRU.Set(scanStats.ConsumedRU)

// In order to account for SQL CPU time, we have to subtract the CPU time
// spent while serving KV requests on a SQL goroutine.
cpuTime -= vsc.kvReader.GetKVCPUTime()
} else {
s.Exec.ExecTime.Set(time)
}
if cpuTime > 0 && grunning.Supported() {
s.Exec.CPUTime.Set(cpuTime)
}

s.Output.NumBatches.Set(numBatches)
s.Output.NumTuples.Set(numTuples)
Expand Down Expand Up @@ -289,7 +314,7 @@ type networkVectorizedStatsCollectorImpl struct {

// GetStats is part of the colexecop.VectorizedStatsCollector interface.
func (nvsc *networkVectorizedStatsCollectorImpl) GetStats() *execinfrapb.ComponentStats {
numBatches, numTuples, time, ok := nvsc.batchInfoCollector.finishAndGetStats()
numBatches, numTuples, time, _, ok := nvsc.batchInfoCollector.finishAndGetStats()
if !ok {
// The stats collection wasn't successful for some reason, so we will
// return an empty object (since nil is not allowed by the contract of
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ func (s *vectorizedFlowCreator) wrapWithVectorizedStatsCollectorBase(
component execinfrapb.ComponentID,
monitors []*mon.BytesMonitor,
) error {
inputWatch := timeutil.NewStopWatch()
inputWatch := timeutil.NewStopWatchWithCPU()
var memMonitors, diskMonitors []*mon.BytesMonitor
for _, m := range monitors {
if m.Resource() == mon.DiskResource {
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/execinfrapb/component_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ func (s *ComponentStats) formatStats(fn func(suffix string, value interface{}))
if s.Exec.MaxAllocatedDisk.HasValue() {
fn("max sql temp disk usage", humanize.IBytes(s.Exec.MaxAllocatedDisk.Value()))
}
if s.Exec.CPUTime.HasValue() {
fn("sql cpu time", humanizeutil.Duration(s.Exec.CPUTime.Value()))
}

// Output stats.
if s.Output.NumBatches.HasValue() {
Expand Down Expand Up @@ -267,6 +270,9 @@ func (s *ComponentStats) Union(other *ComponentStats) *ComponentStats {
if !result.Exec.ConsumedRU.HasValue() {
result.Exec.ConsumedRU = other.Exec.ConsumedRU
}
if !result.Exec.CPUTime.HasValue() {
result.Exec.CPUTime = other.Exec.CPUTime
}

// Output stats.
if !result.Output.NumBatches.HasValue() {
Expand Down Expand Up @@ -360,6 +366,7 @@ func (s *ComponentStats) MakeDeterministic() {
resetUint(&s.Exec.MaxAllocatedMem)
resetUint(&s.Exec.MaxAllocatedDisk)
resetUint(&s.Exec.ConsumedRU)
timeVal(&s.Exec.CPUTime)

// Output.
resetUint(&s.Output.NumBatches)
Expand Down
16 changes: 14 additions & 2 deletions pkg/sql/execinfrapb/component_stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,13 @@ message KVStats {

// contention_events hit at the statement level.
repeated cockroach.roachpb.ContentionEvent contention_events = 10 [(gogoproto.nullable) = false];

// Cumulated CPU time spent fulfilling KV requests on a SQL goroutine. This
// would not include CPU time spent on a remote node, but would include time
// spent on a Scan's goroutine. This is only used to calculate SQL CPU time
// for processors in the row-engine that perform KV requests.
optional util.optional.Duration kv_cpu_time = 11 [(gogoproto.nullable) = false,
(gogoproto.customname) = "KVCPUTime"];
}

// ExecStats contains statistics about the execution of a component.
Expand All @@ -151,7 +158,11 @@ message ExecStats {
// Maximum scratch disk allocated by the component.
optional util.optional.Uint max_allocated_disk = 3 [(gogoproto.nullable) = false];
// Amount of RUs consumed while executing the component.
optional util.optional.Uint consumed_r_u = 4 [(gogoproto.nullable) = false];
optional util.optional.Uint consumed_ru = 4 [(gogoproto.nullable) = false,
(gogoproto.customname) = "ConsumedRU"];
// CPU time spent executing the component.
optional util.optional.Duration cpu_time = 5 [(gogoproto.nullable) = false,
(gogoproto.customname) = "CPUTime"];
}

// OutputStats contains statistics about the output (results) of a component.
Expand All @@ -169,5 +180,6 @@ message FlowStats {
optional util.optional.Uint max_disk_usage = 2 [(gogoproto.nullable) = false];
// Amount of RUs consumed due to CPU usage while executing the flow. Currently
// only used for remote flows.
optional util.optional.Uint consumed_r_u = 3 [(gogoproto.nullable) = false];
optional util.optional.Uint consumed_ru = 3 [(gogoproto.nullable) = false,
(gogoproto.customname) = "ConsumedRU"];
}
9 changes: 9 additions & 0 deletions pkg/sql/execstats/traceanalyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ type NodeLevelStats struct {
NetworkMessagesGroupedByNode map[base.SQLInstanceID]int64
ContentionTimeGroupedByNode map[base.SQLInstanceID]time.Duration
RUEstimateGroupedByNode map[base.SQLInstanceID]int64
CPUTimeGroupedByNode map[base.SQLInstanceID]time.Duration
}

// QueryLevelStats returns all the query level stats that correspond to the
Expand All @@ -127,6 +128,7 @@ type QueryLevelStats struct {
ContentionTime time.Duration
ContentionEvents []roachpb.ContentionEvent
RUEstimate int64
CPUTime time.Duration
}

// QueryLevelStatsWithErr is the same as QueryLevelStats, but also tracks
Expand Down Expand Up @@ -162,6 +164,7 @@ func (s *QueryLevelStats) Accumulate(other QueryLevelStats) {
s.ContentionTime += other.ContentionTime
s.ContentionEvents = append(s.ContentionEvents, other.ContentionEvents...)
s.RUEstimate += other.RUEstimate
s.CPUTime += other.CPUTime
}

// TraceAnalyzer is a struct that helps calculate top-level statistics from a
Expand Down Expand Up @@ -238,6 +241,7 @@ func (a *TraceAnalyzer) ProcessStats() error {
NetworkMessagesGroupedByNode: make(map[base.SQLInstanceID]int64),
ContentionTimeGroupedByNode: make(map[base.SQLInstanceID]time.Duration),
RUEstimateGroupedByNode: make(map[base.SQLInstanceID]int64),
CPUTimeGroupedByNode: make(map[base.SQLInstanceID]time.Duration),
}
var errs error

Expand All @@ -254,6 +258,7 @@ func (a *TraceAnalyzer) ProcessStats() error {
a.nodeLevelStats.KVTimeGroupedByNode[instanceID] += stats.KV.KVTime.Value()
a.nodeLevelStats.ContentionTimeGroupedByNode[instanceID] += stats.KV.ContentionTime.Value()
a.nodeLevelStats.RUEstimateGroupedByNode[instanceID] += int64(stats.Exec.ConsumedRU.Value())
a.nodeLevelStats.CPUTimeGroupedByNode[instanceID] += stats.Exec.CPUTime.Value()
allContentionEvents = append(allContentionEvents, stats.KV.ContentionEvents...)
}

Expand Down Expand Up @@ -374,6 +379,10 @@ func (a *TraceAnalyzer) ProcessStats() error {
a.queryLevelStats.RUEstimate += estimatedRU
}

for _, cpuTime := range a.nodeLevelStats.CPUTimeGroupedByNode {
a.queryLevelStats.CPUTime += cpuTime
}

a.queryLevelStats.ContentionEvents = allContentionEvents

return errs
Expand Down
Loading

0 comments on commit 2880abb

Please sign in to comment.