diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 30b72ae9b01b..c4bec5900643 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -485,6 +485,7 @@ go_library( "//pkg/util/errorutil/unimplemented", "//pkg/util/fsm", "//pkg/util/grpcutil", + "//pkg/util/grunning", "//pkg/util/hlc", "//pkg/util/humanizeutil", "//pkg/util/interval", diff --git a/pkg/sql/colflow/BUILD.bazel b/pkg/sql/colflow/BUILD.bazel index 35a4f95c0ec6..2e9a526a8321 100644 --- a/pkg/sql/colflow/BUILD.bazel +++ b/pkg/sql/colflow/BUILD.bazel @@ -48,6 +48,7 @@ go_library( "//pkg/util", "//pkg/util/admission", "//pkg/util/buildutil", + "//pkg/util/grunning", "//pkg/util/log", "//pkg/util/metric", "//pkg/util/mon", diff --git a/pkg/sql/colflow/stats.go b/pkg/sql/colflow/stats.go index 74ec91987d9e..b5b4bf503934 100644 --- a/pkg/sql/colflow/stats.go +++ b/pkg/sql/colflow/stats.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/execstats" "github.com/cockroachdb/cockroach/pkg/util/buildutil" + "github.com/cockroachdb/cockroach/pkg/util/grunning" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -32,6 +33,7 @@ import ( // colexecop.VectorizedStatsCollector's childStatsCollectors. type childStatsCollector interface { getElapsedTime() time.Duration + getElapsedCPUTime() time.Duration } // batchInfoCollector is a helper used by collector implementations. @@ -64,6 +66,11 @@ type batchInfoCollector struct { // in finishAndGetStats(). stopwatch *timeutil.StopWatch + // cpuStopwatch tracks the amount of CPU time the wrapped operator consumed + // doing work. Similar to stopwatch, this will include all the CPU time that + // the operator's inputs spent doing work in the same goroutine. + cpuStopwatch *timeutil.CPUStopWatch + // childStatsCollectors contains the stats collectors for all of the inputs // to the wrapped operator. childStatsCollectors []childStatsCollector @@ -80,10 +87,15 @@ func makeBatchInfoCollector( if inputWatch == nil { colexecerror.InternalError(errors.AssertionFailedf("input watch is nil")) } + var cpuStopWatch *timeutil.CPUStopWatch + if grunning.Supported() { + cpuStopWatch = &timeutil.CPUStopWatch{} + } return batchInfoCollector{ OneInputNode: colexecop.NewOneInputNode(op), componentID: id, stopwatch: inputWatch, + cpuStopwatch: cpuStopWatch, childStatsCollectors: childStatsCollectors, } } @@ -96,10 +108,12 @@ func (bic *batchInfoCollector) init() { func (bic *batchInfoCollector) Init(ctx context.Context) { bic.ctx = ctx bic.stopwatch.Start() + bic.cpuStopwatch.Start() // Wrap the call to Init() with a panic catcher in order to get the correct // execution time (e.g. in the statement bundle). err := colexecerror.CatchVectorizedRuntimeError(bic.init) bic.stopwatch.Stop() + bic.cpuStopwatch.Stop() if err != nil { colexecerror.InternalError(err) } @@ -119,10 +133,12 @@ func (bic *batchInfoCollector) next() { // Next is part of the colexecop.Operator interface. func (bic *batchInfoCollector) Next() coldata.Batch { bic.stopwatch.Start() + bic.cpuStopwatch.Start() // Wrap the call to Next() with a panic catcher in order to get the correct // execution time (e.g. in the statement bundle). err := colexecerror.CatchVectorizedRuntimeError(bic.next) bic.stopwatch.Stop() + bic.cpuStopwatch.Stop() if err != nil { colexecerror.InternalError(err) } @@ -139,24 +155,29 @@ func (bic *batchInfoCollector) Next() coldata.Batch { // operator. ok indicates whether the stats collection was successful. func (bic *batchInfoCollector) finishAndGetStats() ( numBatches, numTuples uint64, - time time.Duration, + time, cpuTime time.Duration, ok bool, ) { tm := bic.stopwatch.Elapsed() + cpuTm := bic.cpuStopwatch.Elapsed() // Subtract the time spent in each of the child stats collectors, to produce // the amount of time that the wrapped operator spent doing work itself, not // including time spent waiting on its inputs. for _, statsCollectors := range bic.childStatsCollectors { tm -= statsCollectors.getElapsedTime() + cpuTm -= statsCollectors.getElapsedCPUTime() } if buildutil.CrdbTestBuild { if tm < 0 { colexecerror.InternalError(errors.AssertionFailedf("unexpectedly execution time is negative")) } + if cpuTm < 0 { + colexecerror.InternalError(errors.AssertionFailedf("unexpectedly CPU time is negative")) + } } bic.mu.Lock() defer bic.mu.Unlock() - return bic.mu.numBatches, bic.mu.numTuples, tm, bic.mu.initialized + return bic.mu.numBatches, bic.mu.numTuples, tm, cpuTm, bic.mu.initialized } // getElapsedTime implements the childStatsCollector interface. @@ -164,6 +185,11 @@ func (bic *batchInfoCollector) getElapsedTime() time.Duration { return bic.stopwatch.Elapsed() } +// getElapsedCPUTime implements the childStatsCollector interface. +func (bic *batchInfoCollector) getElapsedCPUTime() time.Duration { + return bic.cpuStopwatch.Elapsed() +} + // newVectorizedStatsCollector creates a colexecop.VectorizedStatsCollector // which wraps 'op' that corresponds to a component with either ProcessorID or // StreamID 'id' (with 'idTagKey' distinguishing between the two). 'kvReader' is @@ -203,7 +229,7 @@ type vectorizedStatsCollectorImpl struct { // GetStats is part of the colexecop.VectorizedStatsCollector interface. func (vsc *vectorizedStatsCollectorImpl) GetStats() *execinfrapb.ComponentStats { - numBatches, numTuples, time, ok := vsc.batchInfoCollector.finishAndGetStats() + numBatches, numTuples, time, cpuTime, ok := vsc.batchInfoCollector.finishAndGetStats() if !ok { // The stats collection wasn't successful for some reason, so we will // return an empty object (since nil is not allowed by the contract of @@ -255,6 +281,9 @@ func (vsc *vectorizedStatsCollectorImpl) GetStats() *execinfrapb.ComponentStats } else { s.Exec.ExecTime.Set(time) } + if grunning.Supported() { + s.Exec.CPUTime.Set(cpuTime) + } s.Output.NumBatches.Set(numBatches) s.Output.NumTuples.Set(numTuples) @@ -289,7 +318,7 @@ type networkVectorizedStatsCollectorImpl struct { // GetStats is part of the colexecop.VectorizedStatsCollector interface. func (nvsc *networkVectorizedStatsCollectorImpl) GetStats() *execinfrapb.ComponentStats { - numBatches, numTuples, time, ok := nvsc.batchInfoCollector.finishAndGetStats() + numBatches, numTuples, time, _, ok := nvsc.batchInfoCollector.finishAndGetStats() if !ok { // The stats collection wasn't successful for some reason, so we will // return an empty object (since nil is not allowed by the contract of diff --git a/pkg/sql/execinfrapb/component_stats.go b/pkg/sql/execinfrapb/component_stats.go index 2ee92cb6049f..d4f7d445f3ab 100644 --- a/pkg/sql/execinfrapb/component_stats.go +++ b/pkg/sql/execinfrapb/component_stats.go @@ -177,6 +177,9 @@ func (s *ComponentStats) formatStats(fn func(suffix string, value interface{})) if s.Exec.MaxAllocatedDisk.HasValue() { fn("max sql temp disk usage", humanize.IBytes(s.Exec.MaxAllocatedDisk.Value())) } + if s.Exec.CPUTime.HasValue() { + fn("sql cpu time", humanizeutil.Duration(s.Exec.CPUTime.Value())) + } // Output stats. if s.Output.NumBatches.HasValue() { @@ -267,6 +270,9 @@ func (s *ComponentStats) Union(other *ComponentStats) *ComponentStats { if !result.Exec.ConsumedRU.HasValue() { result.Exec.ConsumedRU = other.Exec.ConsumedRU } + if !result.Exec.CPUTime.HasValue() { + result.Exec.CPUTime = other.Exec.CPUTime + } // Output stats. if !result.Output.NumBatches.HasValue() { @@ -360,6 +366,7 @@ func (s *ComponentStats) MakeDeterministic() { resetUint(&s.Exec.MaxAllocatedMem) resetUint(&s.Exec.MaxAllocatedDisk) resetUint(&s.Exec.ConsumedRU) + timeVal(&s.Exec.CPUTime) // Output. resetUint(&s.Output.NumBatches) diff --git a/pkg/sql/execinfrapb/component_stats.proto b/pkg/sql/execinfrapb/component_stats.proto index 8549376685c6..8f447b8d85ee 100644 --- a/pkg/sql/execinfrapb/component_stats.proto +++ b/pkg/sql/execinfrapb/component_stats.proto @@ -152,6 +152,8 @@ message ExecStats { optional util.optional.Uint max_allocated_disk = 3 [(gogoproto.nullable) = false]; // Amount of RUs consumed while executing the component. optional util.optional.Uint consumed_r_u = 4 [(gogoproto.nullable) = false]; + // CPU time spent executing the component. + optional util.optional.Duration c_p_u_time = 5 [(gogoproto.nullable) = false]; } // OutputStats contains statistics about the output (results) of a component. diff --git a/pkg/sql/execstats/traceanalyzer.go b/pkg/sql/execstats/traceanalyzer.go index eaaf8208f284..8950bfa31cbf 100644 --- a/pkg/sql/execstats/traceanalyzer.go +++ b/pkg/sql/execstats/traceanalyzer.go @@ -110,6 +110,7 @@ type NodeLevelStats struct { NetworkMessagesGroupedByNode map[base.SQLInstanceID]int64 ContentionTimeGroupedByNode map[base.SQLInstanceID]time.Duration RUEstimateGroupedByNode map[base.SQLInstanceID]int64 + CPUTimeGroupedByNode map[base.SQLInstanceID]time.Duration } // QueryLevelStats returns all the query level stats that correspond to the @@ -127,6 +128,7 @@ type QueryLevelStats struct { ContentionTime time.Duration ContentionEvents []roachpb.ContentionEvent RUEstimate int64 + CPUTime time.Duration } // QueryLevelStatsWithErr is the same as QueryLevelStats, but also tracks @@ -162,6 +164,7 @@ func (s *QueryLevelStats) Accumulate(other QueryLevelStats) { s.ContentionTime += other.ContentionTime s.ContentionEvents = append(s.ContentionEvents, other.ContentionEvents...) s.RUEstimate += other.RUEstimate + s.CPUTime += other.CPUTime } // TraceAnalyzer is a struct that helps calculate top-level statistics from a @@ -238,6 +241,7 @@ func (a *TraceAnalyzer) ProcessStats() error { NetworkMessagesGroupedByNode: make(map[base.SQLInstanceID]int64), ContentionTimeGroupedByNode: make(map[base.SQLInstanceID]time.Duration), RUEstimateGroupedByNode: make(map[base.SQLInstanceID]int64), + CPUTimeGroupedByNode: make(map[base.SQLInstanceID]time.Duration), } var errs error @@ -254,6 +258,7 @@ func (a *TraceAnalyzer) ProcessStats() error { a.nodeLevelStats.KVTimeGroupedByNode[instanceID] += stats.KV.KVTime.Value() a.nodeLevelStats.ContentionTimeGroupedByNode[instanceID] += stats.KV.ContentionTime.Value() a.nodeLevelStats.RUEstimateGroupedByNode[instanceID] += int64(stats.Exec.ConsumedRU.Value()) + a.nodeLevelStats.CPUTimeGroupedByNode[instanceID] += stats.Exec.CPUTime.Value() allContentionEvents = append(allContentionEvents, stats.KV.ContentionEvents...) } @@ -374,6 +379,10 @@ func (a *TraceAnalyzer) ProcessStats() error { a.queryLevelStats.RUEstimate += estimatedRU } + for _, cpuTime := range a.nodeLevelStats.CPUTimeGroupedByNode { + a.queryLevelStats.CPUTime += cpuTime + } + a.queryLevelStats.ContentionEvents = allContentionEvents return errs diff --git a/pkg/sql/instrumentation.go b/pkg/sql/instrumentation.go index 25f599e414f1..4d769afc19ae 100644 --- a/pkg/sql/instrumentation.go +++ b/pkg/sql/instrumentation.go @@ -40,6 +40,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/buildutil" + "github.com/cockroachdb/cockroach/pkg/util/grunning" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" @@ -551,6 +552,9 @@ func (ih *instrumentationHelper) emitExplainAnalyzePlanToOutputBuilder( ob.AddMaxMemUsage(queryStats.MaxMemUsage) ob.AddNetworkStats(queryStats.NetworkMessages, queryStats.NetworkBytesSent) ob.AddMaxDiskUsage(queryStats.MaxDiskUsage) + if grunning.Supported() { + ob.AddCPUTime(queryStats.CPUTime) + } if ih.isTenant && ih.outputMode != unmodifiedOutput && ih.vectorized { // Only output RU estimate if this is a tenant running EXPLAIN ANALYZE. // Additionally, RUs aren't correctly propagated in all cases for plans @@ -692,6 +696,7 @@ func (m execNodeTraceMetadata) annotateExplain( nodeStats.VectorizedBatchCount.MaybeAdd(stats.Output.NumBatches) nodeStats.MaxAllocatedMem.MaybeAdd(stats.Exec.MaxAllocatedMem) nodeStats.MaxAllocatedDisk.MaybeAdd(stats.Exec.MaxAllocatedDisk) + nodeStats.SQLCPUTime.MaybeAdd(stats.Exec.CPUTime) } // If we didn't get statistics for all processors, we don't show the // incomplete results. In the future, we may consider an incomplete flag diff --git a/pkg/sql/opt/exec/explain/emit.go b/pkg/sql/opt/exec/explain/emit.go index 8caaabf02dcc..68f4fe81cb77 100644 --- a/pkg/sql/opt/exec/explain/emit.go +++ b/pkg/sql/opt/exec/explain/emit.go @@ -424,6 +424,9 @@ func (e *emitter) emitNodeAttributes(n *Node) error { if s.MaxAllocatedDisk.HasValue() { e.ob.AddField("estimated max sql temp disk usage", humanize.IBytes(s.MaxAllocatedDisk.Value())) } + if s.SQLCPUTime.HasValue() { + e.ob.AddField("sql cpu time", string(humanizeutil.Duration(s.SQLCPUTime.Value()))) + } if e.ob.flags.Verbose { if s.StepCount.HasValue() { e.ob.AddField("MVCC step count (ext/int)", fmt.Sprintf("%s/%s", diff --git a/pkg/sql/opt/exec/explain/output.go b/pkg/sql/opt/exec/explain/output.go index a408fce7578b..a3a338ebcb1e 100644 --- a/pkg/sql/opt/exec/explain/output.go +++ b/pkg/sql/opt/exec/explain/output.go @@ -361,6 +361,16 @@ func (ob *OutputBuilder) AddMaxDiskUsage(bytes int64) { } } +// AddCPUTime adds a top-level field for the cumulative cpu time spent by SQL +// execution. +func (ob *OutputBuilder) AddCPUTime(cpuTime time.Duration) { + ob.AddRedactableTopLevelField( + RedactVolatile, + "sql cpu time", + string(humanizeutil.Duration(cpuTime)), + ) +} + // AddRUEstimate adds a top-level field for the estimated number of RUs consumed // by the query. func (ob *OutputBuilder) AddRUEstimate(ru int64) { diff --git a/pkg/sql/opt/exec/factory.go b/pkg/sql/opt/exec/factory.go index 989a2db9cb2f..ffb6e1e880c1 100644 --- a/pkg/sql/opt/exec/factory.go +++ b/pkg/sql/opt/exec/factory.go @@ -343,6 +343,7 @@ type ExecutionStats struct { MaxAllocatedMem optional.Uint MaxAllocatedDisk optional.Uint + SQLCPUTime optional.Duration // Nodes on which this operator was executed. Nodes []string diff --git a/pkg/util/timeutil/BUILD.bazel b/pkg/util/timeutil/BUILD.bazel index 9cbe9b1b811c..3e901ec9c8b4 100644 --- a/pkg/util/timeutil/BUILD.bazel +++ b/pkg/util/timeutil/BUILD.bazel @@ -4,6 +4,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "timeutil", srcs = [ + "cpustopwatch.go", "manual_time.go", "stopwatch.go", "time.go", @@ -17,6 +18,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/util/timeutil", visibility = ["//visibility:public"], deps = [ + "//pkg/util/grunning", "//pkg/util/syncutil", "@com_github_cockroachdb_errors//:errors", ], diff --git a/pkg/util/timeutil/cpustopwatch.go b/pkg/util/timeutil/cpustopwatch.go new file mode 100644 index 000000000000..990811a7b516 --- /dev/null +++ b/pkg/util/timeutil/cpustopwatch.go @@ -0,0 +1,47 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package timeutil + +import ( + "time" + + "github.com/cockroachdb/cockroach/pkg/util/grunning" +) + +// CPUStopWatch is a utility stop watch for measuring CPU time spent by a +// component. It can be safely started and stopped multiple times, but is +// not safe to use concurrently. If CPUStopWatch is nil, all operations are +// no-ops. +type CPUStopWatch struct { + startedAt time.Duration + elapsed time.Duration +} + +func (w *CPUStopWatch) Start() { + if w == nil { + return + } + w.startedAt = grunning.Time() +} + +func (w *CPUStopWatch) Stop() { + if w == nil { + return + } + w.elapsed += grunning.Difference(w.startedAt, grunning.Time()) +} + +func (w *CPUStopWatch) Elapsed() time.Duration { + if w == nil { + return 0 + } + return w.elapsed +}