diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 590d64225ad9..8d6b48f60cd1 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -490,6 +490,7 @@ go_library( "//pkg/util/errorutil/unimplemented", "//pkg/util/fsm", "//pkg/util/grpcutil", + "//pkg/util/grunning", "//pkg/util/hlc", "//pkg/util/humanizeutil", "//pkg/util/interval", diff --git a/pkg/sql/colexecop/operator.go b/pkg/sql/colexecop/operator.go index 554f29a10965..68094182164a 100644 --- a/pkg/sql/colexecop/operator.go +++ b/pkg/sql/colexecop/operator.go @@ -79,6 +79,10 @@ type KVReader interface { // GetScanStats returns statistics about the scan that happened during the // KV reads. It must be safe for concurrent use. GetScanStats() execstats.ScanStats + // GetKVCPUTime returns the CPU time consumed *on the current goroutine* by + // KV requests. It must be safe for concurrent use. It is used to calculate + // the SQL CPU time. + GetKVCPUTime() time.Duration } // ZeroInputNode is an execopnode.OpNode with no inputs. diff --git a/pkg/sql/colfetcher/cfetcher.go b/pkg/sql/colfetcher/cfetcher.go index d867951a1fe9..c743df7047f4 100644 --- a/pkg/sql/colfetcher/cfetcher.go +++ b/pkg/sql/colfetcher/cfetcher.go @@ -17,6 +17,7 @@ import ( "sort" "strings" "sync" + "time" "github.com/cockroachdb/apd/v3" "github.com/cockroachdb/cockroach/pkg/col/coldata" @@ -220,15 +221,18 @@ type cFetcher struct { nextKVer storage.NextKVer // fetcher, if set, is the same object as nextKVer. fetcher *row.KVFetcher - // bytesRead and batchRequestsIssued store the total number of bytes read - // and of BatchRequests issued, respectively, by this cFetcher throughout - // its lifetime in case when the underlying row.KVFetcher has already been - // closed and nil-ed out. - // - // The fields should not be accessed directly by the users of the cFetcher - - // getBytesRead() and getBatchRequestsIssued() should be used instead. - bytesRead int64 + // bytesRead stores the total number of bytes read by this cFetcher throughout + // its lifetime after the underlying row.KVFetcher has been closed and nil-ed + // out. It should only be accessed through getBytesRead(). + bytesRead int64 + // batchRequestsIssued stores the total number of BatchRequests issued by this + // cFetcher after the underlying row.KVFetcher has been closed and nil-ed out. + // It can only be accessed through getBatchRequestsIssued(). batchRequestsIssued int64 + // kvCPUTime stores the CPU time spent by this cFetcher while fulfilling KV + // requests *in the current goroutine* after the underlying row.KVFetcher has + // been closed and nil-ed out. It can only be accessed through getKVCPUTime(). + kvCPUTime time.Duration // machine contains fields that get updated during the run of the fetcher. machine struct { @@ -1292,6 +1296,13 @@ func (cf *cFetcher) getBytesRead() int64 { return cf.bytesRead } +func (cf *cFetcher) getKVCPUTime() time.Duration { + if cf.fetcher != nil { + return cf.fetcher.GetKVCPUTime() + } + return cf.kvCPUTime +} + // getBatchRequestsIssued returns the number of BatchRequests issued by the // cFetcher throughout its lifetime so far. func (cf *cFetcher) getBatchRequestsIssued() int64 { @@ -1327,6 +1338,7 @@ func (cf *cFetcher) Close(ctx context.Context) { if cf.fetcher != nil { cf.bytesRead = cf.fetcher.GetBytesRead() cf.batchRequestsIssued = cf.fetcher.GetBatchRequestsIssued() + cf.kvCPUTime = cf.fetcher.GetKVCPUTime() cf.fetcher.Close(ctx) cf.fetcher = nil } diff --git a/pkg/sql/colfetcher/colbatch_scan.go b/pkg/sql/colfetcher/colbatch_scan.go index 708a2474fe46..f84aabe4ca1b 100644 --- a/pkg/sql/colfetcher/colbatch_scan.go +++ b/pkg/sql/colfetcher/colbatch_scan.go @@ -175,6 +175,13 @@ func (s *ColBatchScan) GetScanStats() execstats.ScanStats { return execstats.GetScanStats(s.Ctx, nil /* recording */) } +// GetKVCPUTime is part of the colexecop.KVReader interface. +func (s *ColBatchScan) GetKVCPUTime() time.Duration { + s.mu.Lock() + defer s.mu.Unlock() + return s.cf.getKVCPUTime() +} + var colBatchScanPool = sync.Pool{ New: func() interface{} { return &ColBatchScan{} diff --git a/pkg/sql/colfetcher/index_join.go b/pkg/sql/colfetcher/index_join.go index a3e187a65dae..dfed9308934d 100644 --- a/pkg/sql/colfetcher/index_join.go +++ b/pkg/sql/colfetcher/index_join.go @@ -400,6 +400,13 @@ func (s *ColIndexJoin) GetBatchRequestsIssued() int64 { return s.cf.getBatchRequestsIssued() } +// GetKVCPUTime is part of the colexecop.KVReader interface. +func (s *ColIndexJoin) GetKVCPUTime() time.Duration { + s.mu.Lock() + defer s.mu.Unlock() + return s.cf.getKVCPUTime() +} + // GetContentionInfo is part of the colexecop.KVReader interface. func (s *ColIndexJoin) GetContentionInfo() (time.Duration, []roachpb.ContentionEvent) { return execstats.GetCumulativeContentionTime(s.Ctx, nil /* recording */) diff --git a/pkg/sql/colflow/BUILD.bazel b/pkg/sql/colflow/BUILD.bazel index 69b3de1ce0f3..5908eb10ceba 100644 --- a/pkg/sql/colflow/BUILD.bazel +++ b/pkg/sql/colflow/BUILD.bazel @@ -48,6 +48,7 @@ go_library( "//pkg/util", "//pkg/util/admission", "//pkg/util/buildutil", + "//pkg/util/grunning", "//pkg/util/log", "//pkg/util/metric", "//pkg/util/mon", diff --git a/pkg/sql/colflow/stats.go b/pkg/sql/colflow/stats.go index 74ec91987d9e..6da9873623bc 100644 --- a/pkg/sql/colflow/stats.go +++ b/pkg/sql/colflow/stats.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/execstats" "github.com/cockroachdb/cockroach/pkg/util/buildutil" + "github.com/cockroachdb/cockroach/pkg/util/grunning" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -32,6 +33,7 @@ import ( // colexecop.VectorizedStatsCollector's childStatsCollectors. type childStatsCollector interface { getElapsedTime() time.Duration + getElapsedCPUTime() time.Duration } // batchInfoCollector is a helper used by collector implementations. @@ -139,24 +141,29 @@ func (bic *batchInfoCollector) Next() coldata.Batch { // operator. ok indicates whether the stats collection was successful. func (bic *batchInfoCollector) finishAndGetStats() ( numBatches, numTuples uint64, - time time.Duration, + time, cpuTime time.Duration, ok bool, ) { tm := bic.stopwatch.Elapsed() + cpuTm := bic.stopwatch.ElapsedCPU() // Subtract the time spent in each of the child stats collectors, to produce // the amount of time that the wrapped operator spent doing work itself, not // including time spent waiting on its inputs. for _, statsCollectors := range bic.childStatsCollectors { tm -= statsCollectors.getElapsedTime() + cpuTm -= statsCollectors.getElapsedCPUTime() } if buildutil.CrdbTestBuild { if tm < 0 { colexecerror.InternalError(errors.AssertionFailedf("unexpectedly execution time is negative")) } + if cpuTm < 0 { + colexecerror.InternalError(errors.AssertionFailedf("unexpectedly CPU time is negative")) + } } bic.mu.Lock() defer bic.mu.Unlock() - return bic.mu.numBatches, bic.mu.numTuples, tm, bic.mu.initialized + return bic.mu.numBatches, bic.mu.numTuples, tm, cpuTm, bic.mu.initialized } // getElapsedTime implements the childStatsCollector interface. @@ -164,6 +171,11 @@ func (bic *batchInfoCollector) getElapsedTime() time.Duration { return bic.stopwatch.Elapsed() } +// getElapsedCPUTime implements the childStatsCollector interface. +func (bic *batchInfoCollector) getElapsedCPUTime() time.Duration { + return bic.stopwatch.ElapsedCPU() +} + // newVectorizedStatsCollector creates a colexecop.VectorizedStatsCollector // which wraps 'op' that corresponds to a component with either ProcessorID or // StreamID 'id' (with 'idTagKey' distinguishing between the two). 'kvReader' is @@ -203,7 +215,7 @@ type vectorizedStatsCollectorImpl struct { // GetStats is part of the colexecop.VectorizedStatsCollector interface. func (vsc *vectorizedStatsCollectorImpl) GetStats() *execinfrapb.ComponentStats { - numBatches, numTuples, time, ok := vsc.batchInfoCollector.finishAndGetStats() + numBatches, numTuples, time, cpuTime, ok := vsc.batchInfoCollector.finishAndGetStats() if !ok { // The stats collection wasn't successful for some reason, so we will // return an empty object (since nil is not allowed by the contract of @@ -221,6 +233,12 @@ func (vsc *vectorizedStatsCollectorImpl) GetStats() *execinfrapb.ComponentStats var s *execinfrapb.ComponentStats if vsc.columnarizer != nil { s = vsc.columnarizer.GetStats() + + // If the columnarizer is wrapping an operator that performs KV operations, + // we must subtract the CPU time spent performing KV work on a SQL goroutine + // from the measured CPU time. If the wrapped operator does not perform KV + // operations, this value will be zero. + cpuTime -= s.KV.KVCPUTime.Value() } else { // There was no root columnarizer, so create a new stats object. s = &execinfrapb.ComponentStats{Component: vsc.componentID} @@ -252,9 +270,16 @@ func (vsc *vectorizedStatsCollectorImpl) GetStats() *execinfrapb.ComponentStats scanStats := vsc.kvReader.GetScanStats() execstats.PopulateKVMVCCStats(&s.KV, &scanStats) s.Exec.ConsumedRU.Set(scanStats.ConsumedRU) + + // In order to account for SQL CPU time, we have to subtract the CPU time + // spent while serving KV requests on a SQL goroutine. + cpuTime -= vsc.kvReader.GetKVCPUTime() } else { s.Exec.ExecTime.Set(time) } + if cpuTime > 0 && grunning.Supported() { + s.Exec.CPUTime.Set(cpuTime) + } s.Output.NumBatches.Set(numBatches) s.Output.NumTuples.Set(numTuples) @@ -289,7 +314,7 @@ type networkVectorizedStatsCollectorImpl struct { // GetStats is part of the colexecop.VectorizedStatsCollector interface. func (nvsc *networkVectorizedStatsCollectorImpl) GetStats() *execinfrapb.ComponentStats { - numBatches, numTuples, time, ok := nvsc.batchInfoCollector.finishAndGetStats() + numBatches, numTuples, time, _, ok := nvsc.batchInfoCollector.finishAndGetStats() if !ok { // The stats collection wasn't successful for some reason, so we will // return an empty object (since nil is not allowed by the contract of diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index 82c57e4777ea..deb67669c0b0 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -409,7 +409,7 @@ func (s *vectorizedFlowCreator) wrapWithVectorizedStatsCollectorBase( component execinfrapb.ComponentID, monitors []*mon.BytesMonitor, ) error { - inputWatch := timeutil.NewStopWatch() + inputWatch := timeutil.NewStopWatchWithCPU() var memMonitors, diskMonitors []*mon.BytesMonitor for _, m := range monitors { if m.Resource() == mon.DiskResource { diff --git a/pkg/sql/execinfrapb/component_stats.go b/pkg/sql/execinfrapb/component_stats.go index 7cdba6f51796..daa186193fb7 100644 --- a/pkg/sql/execinfrapb/component_stats.go +++ b/pkg/sql/execinfrapb/component_stats.go @@ -177,6 +177,9 @@ func (s *ComponentStats) formatStats(fn func(suffix string, value interface{})) if s.Exec.MaxAllocatedDisk.HasValue() { fn("max sql temp disk usage", humanize.IBytes(s.Exec.MaxAllocatedDisk.Value())) } + if s.Exec.CPUTime.HasValue() { + fn("sql cpu time", humanizeutil.Duration(s.Exec.CPUTime.Value())) + } // Output stats. if s.Output.NumBatches.HasValue() { @@ -267,6 +270,9 @@ func (s *ComponentStats) Union(other *ComponentStats) *ComponentStats { if !result.Exec.ConsumedRU.HasValue() { result.Exec.ConsumedRU = other.Exec.ConsumedRU } + if !result.Exec.CPUTime.HasValue() { + result.Exec.CPUTime = other.Exec.CPUTime + } // Output stats. if !result.Output.NumBatches.HasValue() { @@ -360,6 +366,7 @@ func (s *ComponentStats) MakeDeterministic() { resetUint(&s.Exec.MaxAllocatedMem) resetUint(&s.Exec.MaxAllocatedDisk) resetUint(&s.Exec.ConsumedRU) + timeVal(&s.Exec.CPUTime) // Output. resetUint(&s.Output.NumBatches) diff --git a/pkg/sql/execinfrapb/component_stats.proto b/pkg/sql/execinfrapb/component_stats.proto index 8549376685c6..26f04999e5fe 100644 --- a/pkg/sql/execinfrapb/component_stats.proto +++ b/pkg/sql/execinfrapb/component_stats.proto @@ -140,6 +140,13 @@ message KVStats { // contention_events hit at the statement level. repeated cockroach.roachpb.ContentionEvent contention_events = 10 [(gogoproto.nullable) = false]; + + // Cumulated CPU time spent fulfilling KV requests on a SQL goroutine. This + // would not include CPU time spent on a remote node, but would include time + // spent on a Scan's goroutine. This is only used to calculate SQL CPU time + // for processors in the row-engine that perform KV requests. + optional util.optional.Duration kv_cpu_time = 11 [(gogoproto.nullable) = false, + (gogoproto.customname) = "KVCPUTime"]; } // ExecStats contains statistics about the execution of a component. @@ -151,7 +158,11 @@ message ExecStats { // Maximum scratch disk allocated by the component. optional util.optional.Uint max_allocated_disk = 3 [(gogoproto.nullable) = false]; // Amount of RUs consumed while executing the component. - optional util.optional.Uint consumed_r_u = 4 [(gogoproto.nullable) = false]; + optional util.optional.Uint consumed_ru = 4 [(gogoproto.nullable) = false, + (gogoproto.customname) = "ConsumedRU"]; + // CPU time spent executing the component. + optional util.optional.Duration cpu_time = 5 [(gogoproto.nullable) = false, + (gogoproto.customname) = "CPUTime"]; } // OutputStats contains statistics about the output (results) of a component. @@ -169,5 +180,6 @@ message FlowStats { optional util.optional.Uint max_disk_usage = 2 [(gogoproto.nullable) = false]; // Amount of RUs consumed due to CPU usage while executing the flow. Currently // only used for remote flows. - optional util.optional.Uint consumed_r_u = 3 [(gogoproto.nullable) = false]; + optional util.optional.Uint consumed_ru = 3 [(gogoproto.nullable) = false, + (gogoproto.customname) = "ConsumedRU"]; } diff --git a/pkg/sql/execstats/traceanalyzer.go b/pkg/sql/execstats/traceanalyzer.go index eaaf8208f284..8950bfa31cbf 100644 --- a/pkg/sql/execstats/traceanalyzer.go +++ b/pkg/sql/execstats/traceanalyzer.go @@ -110,6 +110,7 @@ type NodeLevelStats struct { NetworkMessagesGroupedByNode map[base.SQLInstanceID]int64 ContentionTimeGroupedByNode map[base.SQLInstanceID]time.Duration RUEstimateGroupedByNode map[base.SQLInstanceID]int64 + CPUTimeGroupedByNode map[base.SQLInstanceID]time.Duration } // QueryLevelStats returns all the query level stats that correspond to the @@ -127,6 +128,7 @@ type QueryLevelStats struct { ContentionTime time.Duration ContentionEvents []roachpb.ContentionEvent RUEstimate int64 + CPUTime time.Duration } // QueryLevelStatsWithErr is the same as QueryLevelStats, but also tracks @@ -162,6 +164,7 @@ func (s *QueryLevelStats) Accumulate(other QueryLevelStats) { s.ContentionTime += other.ContentionTime s.ContentionEvents = append(s.ContentionEvents, other.ContentionEvents...) s.RUEstimate += other.RUEstimate + s.CPUTime += other.CPUTime } // TraceAnalyzer is a struct that helps calculate top-level statistics from a @@ -238,6 +241,7 @@ func (a *TraceAnalyzer) ProcessStats() error { NetworkMessagesGroupedByNode: make(map[base.SQLInstanceID]int64), ContentionTimeGroupedByNode: make(map[base.SQLInstanceID]time.Duration), RUEstimateGroupedByNode: make(map[base.SQLInstanceID]int64), + CPUTimeGroupedByNode: make(map[base.SQLInstanceID]time.Duration), } var errs error @@ -254,6 +258,7 @@ func (a *TraceAnalyzer) ProcessStats() error { a.nodeLevelStats.KVTimeGroupedByNode[instanceID] += stats.KV.KVTime.Value() a.nodeLevelStats.ContentionTimeGroupedByNode[instanceID] += stats.KV.ContentionTime.Value() a.nodeLevelStats.RUEstimateGroupedByNode[instanceID] += int64(stats.Exec.ConsumedRU.Value()) + a.nodeLevelStats.CPUTimeGroupedByNode[instanceID] += stats.Exec.CPUTime.Value() allContentionEvents = append(allContentionEvents, stats.KV.ContentionEvents...) } @@ -374,6 +379,10 @@ func (a *TraceAnalyzer) ProcessStats() error { a.queryLevelStats.RUEstimate += estimatedRU } + for _, cpuTime := range a.nodeLevelStats.CPUTimeGroupedByNode { + a.queryLevelStats.CPUTime += cpuTime + } + a.queryLevelStats.ContentionEvents = allContentionEvents return errs diff --git a/pkg/sql/instrumentation.go b/pkg/sql/instrumentation.go index 0da3a28be8d4..63e593d5f486 100644 --- a/pkg/sql/instrumentation.go +++ b/pkg/sql/instrumentation.go @@ -41,6 +41,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/fsm" + "github.com/cockroachdb/cockroach/pkg/util/grunning" "github.com/cockroachdb/cockroach/pkg/util/intsets" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/tracing" @@ -131,9 +132,10 @@ type instrumentationHelper struct { // serialized version of the plan will be returned via PlanForStats(). savePlanForStats bool - explainPlan *explain.Plan - distribution physicalplan.PlanDistribution - vectorized bool + explainPlan *explain.Plan + distribution physicalplan.PlanDistribution + vectorized bool + containsMutation bool traceMetadata execNodeTraceMetadata @@ -505,10 +507,11 @@ func (ih *instrumentationHelper) RecordExplainPlan(explainPlan *explain.Plan) { // RecordPlanInfo records top-level information about the plan. func (ih *instrumentationHelper) RecordPlanInfo( - distribution physicalplan.PlanDistribution, vectorized bool, + distribution physicalplan.PlanDistribution, vectorized, containsMutation bool, ) { ih.distribution = distribution ih.vectorized = vectorized + ih.containsMutation = containsMutation } // PlanForStats returns the plan as an ExplainTreePlanNode tree, if it was @@ -561,11 +564,17 @@ func (ih *instrumentationHelper) emitExplainAnalyzePlanToOutputBuilder( ob.AddMaxMemUsage(queryStats.MaxMemUsage) ob.AddNetworkStats(queryStats.NetworkMessages, queryStats.NetworkBytesSent) ob.AddMaxDiskUsage(queryStats.MaxDiskUsage) - if ih.isTenant && ih.outputMode != unmodifiedOutput && ih.vectorized { - // Only output RU estimate if this is a tenant running EXPLAIN ANALYZE. - // Additionally, RUs aren't correctly propagated in all cases for plans - // that aren't vectorized - for example, EXPORT statements. For now, - // only output RU estimates for vectorized plans. + if !ih.containsMutation && ih.vectorized && grunning.Supported() { + // Currently we cannot separate SQL CPU time from local KV CPU time for + // mutations, since they do not collect statistics. Additionally, CPU time + // is only collected for vectorized plans. + ob.AddCPUTime(queryStats.CPUTime) + } + if ih.isTenant && ih.vectorized { + // Only output RU estimate if this is a tenant. Additionally, RUs aren't + // correctly propagated in all cases for plans that aren't vectorized - + // for example, EXPORT statements. For now, only output RU estimates for + // vectorized plans. ob.AddRUEstimate(queryStats.RUEstimate) } } @@ -670,6 +679,8 @@ func (m execNodeTraceMetadata) annotateExplain( } } + noMutations := !p.curPlan.flags.IsSet(planFlagContainsMutation) + var walk func(n *explain.Node) walk = func(n *explain.Node) { wrapped := n.WrappedNode() @@ -702,6 +713,11 @@ func (m execNodeTraceMetadata) annotateExplain( nodeStats.VectorizedBatchCount.MaybeAdd(stats.Output.NumBatches) nodeStats.MaxAllocatedMem.MaybeAdd(stats.Exec.MaxAllocatedMem) nodeStats.MaxAllocatedDisk.MaybeAdd(stats.Exec.MaxAllocatedDisk) + if noMutations { + // Currently we cannot separate SQL CPU time from local KV CPU time + // for mutations, since they do not collect statistics. + nodeStats.SQLCPUTime.MaybeAdd(stats.Exec.CPUTime) + } } // If we didn't get statistics for all processors, we don't show the // incomplete results. In the future, we may consider an incomplete flag diff --git a/pkg/sql/opt/exec/explain/emit.go b/pkg/sql/opt/exec/explain/emit.go index 8caaabf02dcc..68f4fe81cb77 100644 --- a/pkg/sql/opt/exec/explain/emit.go +++ b/pkg/sql/opt/exec/explain/emit.go @@ -424,6 +424,9 @@ func (e *emitter) emitNodeAttributes(n *Node) error { if s.MaxAllocatedDisk.HasValue() { e.ob.AddField("estimated max sql temp disk usage", humanize.IBytes(s.MaxAllocatedDisk.Value())) } + if s.SQLCPUTime.HasValue() { + e.ob.AddField("sql cpu time", string(humanizeutil.Duration(s.SQLCPUTime.Value()))) + } if e.ob.flags.Verbose { if s.StepCount.HasValue() { e.ob.AddField("MVCC step count (ext/int)", fmt.Sprintf("%s/%s", diff --git a/pkg/sql/opt/exec/explain/output.go b/pkg/sql/opt/exec/explain/output.go index a408fce7578b..a3a338ebcb1e 100644 --- a/pkg/sql/opt/exec/explain/output.go +++ b/pkg/sql/opt/exec/explain/output.go @@ -361,6 +361,16 @@ func (ob *OutputBuilder) AddMaxDiskUsage(bytes int64) { } } +// AddCPUTime adds a top-level field for the cumulative cpu time spent by SQL +// execution. +func (ob *OutputBuilder) AddCPUTime(cpuTime time.Duration) { + ob.AddRedactableTopLevelField( + RedactVolatile, + "sql cpu time", + string(humanizeutil.Duration(cpuTime)), + ) +} + // AddRUEstimate adds a top-level field for the estimated number of RUs consumed // by the query. func (ob *OutputBuilder) AddRUEstimate(ru int64) { diff --git a/pkg/sql/opt/exec/factory.go b/pkg/sql/opt/exec/factory.go index aaf7ed5c0ac9..254019c9de6a 100644 --- a/pkg/sql/opt/exec/factory.go +++ b/pkg/sql/opt/exec/factory.go @@ -411,6 +411,7 @@ type ExecutionStats struct { MaxAllocatedMem optional.Uint MaxAllocatedDisk optional.Uint + SQLCPUTime optional.Duration // Nodes on which this operator was executed. Nodes []string diff --git a/pkg/sql/plan.go b/pkg/sql/plan.go index 8a0ccaf4c59e..c13915718b8c 100644 --- a/pkg/sql/plan.go +++ b/pkg/sql/plan.go @@ -489,7 +489,8 @@ func (p *planTop) savePlanInfo(ctx context.Context) { } else if p.flags.IsSet(planFlagPartiallyDistributed) { distribution = physicalplan.PartiallyDistributedPlan } - p.instrumentation.RecordPlanInfo(distribution, vectorized) + containsMutation := p.flags.IsSet(planFlagContainsMutation) + p.instrumentation.RecordPlanInfo(distribution, vectorized, containsMutation) } // startExec calls startExec() on each planNode using a depth-first, post-order diff --git a/pkg/sql/row/fetcher.go b/pkg/sql/row/fetcher.go index e6b95eee4b88..536d9ff71477 100644 --- a/pkg/sql/row/fetcher.go +++ b/pkg/sql/row/fetcher.go @@ -108,6 +108,12 @@ type KVBatchFetcher interface { // able to handle a case of uninitialized fetcher. GetBatchRequestsIssued() int64 + // GetKVCPUTime returns the amount of CPU time spent by this fetcher *on the + // current goroutine* while serving KV requests. It is safe for concurrent use + // and is able to handle a case of uninitialized fetcher. It is used in + // calculating SQL CPU time. + GetKVCPUTime() time.Duration + // Close releases the resources of this KVBatchFetcher. Must be called once // the fetcher is no longer in use. Close(ctx context.Context) @@ -1294,3 +1300,10 @@ func (rf *Fetcher) GetBatchRequestsIssued() int64 { } return rf.kvFetcher.GetBatchRequestsIssued() } + +func (rf *Fetcher) GetKVCPUTime() time.Duration { + if rf == nil || rf.kvFetcher == nil || rf.args.WillUseKVProvider { + return 0 + } + return rf.kvFetcher.GetKVCPUTime() +} diff --git a/pkg/sql/row/kv_batch_fetcher.go b/pkg/sql/row/kv_batch_fetcher.go index 5b76766799c9..953073d62701 100644 --- a/pkg/sql/row/kv_batch_fetcher.go +++ b/pkg/sql/row/kv_batch_fetcher.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" ) @@ -751,6 +752,9 @@ type kvBatchFetcherHelper struct { bytesRead int64 batchRequestsIssued *int64 } + // cpuStopWatch tracks CPU usage for the current goroutine. It is safe for + // concurrent use. + cpuStopWatch *timeutil.CPUStopWatch } func (h *kvBatchFetcherHelper) init( @@ -758,11 +762,14 @@ func (h *kvBatchFetcherHelper) init( ) { h.nextBatch = nextBatch h.atomics.batchRequestsIssued = batchRequestsIssued + h.cpuStopWatch = timeutil.NewCPUStopWatch() } // NextBatch implements the KVBatchFetcher interface. func (h *kvBatchFetcherHelper) NextBatch(ctx context.Context) (KVBatchFetcherResponse, error) { + h.cpuStopWatch.Start() resp, err := h.nextBatch(ctx) + h.cpuStopWatch.Stop() if !resp.MoreKVs || err != nil { return resp, err } @@ -790,3 +797,11 @@ func (h *kvBatchFetcherHelper) GetBatchRequestsIssued() int64 { } return atomic.LoadInt64(h.atomics.batchRequestsIssued) } + +// GetKVCPUTime implements the KVBatchFetcher interface. +func (h *kvBatchFetcherHelper) GetKVCPUTime() time.Duration { + if h == nil { + return 0 + } + return h.cpuStopWatch.Elapsed() +} diff --git a/pkg/sql/row/kv_fetcher.go b/pkg/sql/row/kv_fetcher.go index 847f813b71af..1be6a4380b02 100644 --- a/pkg/sql/row/kv_fetcher.go +++ b/pkg/sql/row/kv_fetcher.go @@ -324,5 +324,10 @@ func (f *KVProvider) GetBatchRequestsIssued() int64 { return 0 } +// GetKVCPUTime implements the KVBatchFetcher interface. +func (f *KVProvider) GetKVCPUTime() time.Duration { + return 0 +} + // Close implements the KVBatchFetcher interface. func (f *KVProvider) Close(context.Context) {} diff --git a/pkg/sql/rowexec/inverted_joiner.go b/pkg/sql/rowexec/inverted_joiner.go index 16c964a29381..db95faa1be57 100644 --- a/pkg/sql/rowexec/inverted_joiner.go +++ b/pkg/sql/rowexec/inverted_joiner.go @@ -774,6 +774,7 @@ func (ij *invertedJoiner) execStatsForTrace() *execinfrapb.ComponentStats { ContentionTime: optional.MakeTimeValue(contentionTime), ContentionEvents: contentionEvents, BatchRequestsIssued: optional.MakeUint(uint64(ij.fetcher.GetBatchRequestsIssued())), + KVCPUTime: optional.MakeTimeValue(ij.fetcher.GetKVCPUTime()), }, Exec: execinfrapb.ExecStats{ MaxAllocatedMem: optional.MakeUint(uint64(ij.MemMonitor.MaximumBytes())), diff --git a/pkg/sql/rowexec/joinreader.go b/pkg/sql/rowexec/joinreader.go index d5c11a15310e..bba9ec2d5468 100644 --- a/pkg/sql/rowexec/joinreader.go +++ b/pkg/sql/rowexec/joinreader.go @@ -1182,6 +1182,7 @@ func (jr *joinReader) execStatsForTrace() *execinfrapb.ComponentStats { ContentionTime: optional.MakeTimeValue(contentionTime), ContentionEvents: contentionEvents, BatchRequestsIssued: optional.MakeUint(uint64(jr.fetcher.GetBatchRequestsIssued())), + KVCPUTime: optional.MakeTimeValue(jr.fetcher.GetKVCPUTime()), }, Output: jr.OutputHelper.Stats(), } diff --git a/pkg/sql/rowexec/rowfetcher.go b/pkg/sql/rowexec/rowfetcher.go index 6952ecb4f075..32369b26356a 100644 --- a/pkg/sql/rowexec/rowfetcher.go +++ b/pkg/sql/rowexec/rowfetcher.go @@ -49,6 +49,7 @@ type rowFetcher interface { Reset() GetBytesRead() int64 GetBatchRequestsIssued() int64 + GetKVCPUTime() time.Duration // Close releases any resources held by this fetcher. Close(ctx context.Context) } diff --git a/pkg/sql/rowexec/tablereader.go b/pkg/sql/rowexec/tablereader.go index 1900e934d2c4..c8016cc8eb57 100644 --- a/pkg/sql/rowexec/tablereader.go +++ b/pkg/sql/rowexec/tablereader.go @@ -318,6 +318,7 @@ func (tr *tableReader) execStatsForTrace() *execinfrapb.ComponentStats { ContentionTime: optional.MakeTimeValue(contentionTime), ContentionEvents: contentionEvents, BatchRequestsIssued: optional.MakeUint(uint64(tr.fetcher.GetBatchRequestsIssued())), + KVCPUTime: optional.MakeTimeValue(tr.fetcher.GetKVCPUTime()), }, Output: tr.OutputHelper.Stats(), } diff --git a/pkg/sql/rowexec/zigzagjoiner.go b/pkg/sql/rowexec/zigzagjoiner.go index 21b210a55c34..bdb4fa78bced 100644 --- a/pkg/sql/rowexec/zigzagjoiner.go +++ b/pkg/sql/rowexec/zigzagjoiner.go @@ -12,6 +12,7 @@ package rowexec import ( "context" + "time" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" @@ -855,6 +856,7 @@ func (z *zigzagJoiner) execStatsForTrace() *execinfrapb.ComponentStats { ContentionTime: optional.MakeTimeValue(contentionTime), ContentionEvents: contentionEvents, BatchRequestsIssued: optional.MakeUint(uint64(z.getBatchRequestsIssued())), + KVCPUTime: optional.MakeTimeValue(z.getKVCPUTime()), } execstats.PopulateKVMVCCStats(&kvStats, &z.scanStats) for i := range z.infos { @@ -897,6 +899,14 @@ func (z *zigzagJoiner) getBatchRequestsIssued() int64 { return batchRequestsIssued } +func (z *zigzagJoiner) getKVCPUTime() time.Duration { + var kvCPUTime time.Duration + for i := range z.infos { + kvCPUTime += z.infos[i].fetcher.GetKVCPUTime() + } + return kvCPUTime +} + func (z *zigzagJoiner) generateMeta() []execinfrapb.ProducerMetadata { trailingMeta := make([]execinfrapb.ProducerMetadata, 1, 2) meta := &trailingMeta[0] diff --git a/pkg/util/timeutil/BUILD.bazel b/pkg/util/timeutil/BUILD.bazel index 9cbe9b1b811c..3e901ec9c8b4 100644 --- a/pkg/util/timeutil/BUILD.bazel +++ b/pkg/util/timeutil/BUILD.bazel @@ -4,6 +4,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "timeutil", srcs = [ + "cpustopwatch.go", "manual_time.go", "stopwatch.go", "time.go", @@ -17,6 +18,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/util/timeutil", visibility = ["//visibility:public"], deps = [ + "//pkg/util/grunning", "//pkg/util/syncutil", "@com_github_cockroachdb_errors//:errors", ], diff --git a/pkg/util/timeutil/cpustopwatch.go b/pkg/util/timeutil/cpustopwatch.go new file mode 100644 index 000000000000..eea5bcae89d4 --- /dev/null +++ b/pkg/util/timeutil/cpustopwatch.go @@ -0,0 +1,99 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package timeutil + +import ( + "time" + + "github.com/cockroachdb/cockroach/pkg/util/grunning" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +// CPUStopWatch is a wrapper around cpuStopWatch that is safe to use +// concurrently. If CpuStopWatch is nil, all operations are no-ops and no +// locks are acquired. +type CPUStopWatch struct { + mu struct { + syncutil.Mutex + cpuStopWatch cpuStopWatch + } +} + +// NewCPUStopWatch returns a new CPUStopWatch if the grunning library is +// supported. Otherwise, it returns nil. +func NewCPUStopWatch() *CPUStopWatch { + if grunning.Supported() { + return &CPUStopWatch{} + } + return nil +} + +// Start starts the CPU stop watch if it hasn't already been started. +func (w *CPUStopWatch) Start() { + if w == nil { + return + } + w.mu.Lock() + defer w.mu.Unlock() + w.mu.cpuStopWatch.start() +} + +// Stop stops the CPU stop watch if it hasn't already been stopped and +// accumulates the CPU time that has been spent since it was started. If the +// CPU stop watch has already been stopped, it is a noop. +func (w *CPUStopWatch) Stop() { + if w == nil { + return + } + w.mu.Lock() + defer w.mu.Unlock() + w.mu.cpuStopWatch.stop() +} + +// Elapsed returns the total CPU time measured by the stop watch so far. +func (w *CPUStopWatch) Elapsed() time.Duration { + if w == nil { + return 0 + } + w.mu.Lock() + defer w.mu.Unlock() + return w.mu.cpuStopWatch.elapsed() +} + +// cpuStopWatch is a utility stop watch for measuring CPU time spent by a +// component. It can be safely started and stopped multiple times, but is +// not safe to use concurrently. If cpuStopWatch is nil, all operations are +// no-ops. +type cpuStopWatch struct { + startCPU time.Duration + totalCPU time.Duration +} + +func (w *cpuStopWatch) start() { + if w == nil { + return + } + w.startCPU = grunning.Time() +} + +func (w *cpuStopWatch) stop() { + if w == nil { + return + } + w.totalCPU += grunning.Difference(w.startCPU, grunning.Time()) +} + +func (w *cpuStopWatch) elapsed() time.Duration { + if w == nil { + return 0 + } + return w.totalCPU +} diff --git a/pkg/util/timeutil/stopwatch.go b/pkg/util/timeutil/stopwatch.go index 04c03ca50450..a426301471ec 100644 --- a/pkg/util/timeutil/stopwatch.go +++ b/pkg/util/timeutil/stopwatch.go @@ -13,6 +13,7 @@ package timeutil import ( "time" + "github.com/cockroachdb/cockroach/pkg/util/grunning" "github.com/cockroachdb/cockroach/pkg/util/syncutil" ) @@ -32,6 +33,9 @@ type StopWatch struct { // timeSource is the source of time used by the stop watch. It is always // timeutil.Now except for tests. timeSource func() time.Time + // cpuStopWatch is used to track CPU usage. It may be nil, in which case any + // operations on it are no-ops. + cpuStopWatch *cpuStopWatch } } @@ -40,6 +44,16 @@ func NewStopWatch() *StopWatch { return newStopWatch(Now) } +// NewStopWatchWithCPU creates a new StopWatch that will track CPU usage in +// addition to wall-clock time. +func NewStopWatchWithCPU() *StopWatch { + w := newStopWatch(Now) + if grunning.Supported() { + w.mu.cpuStopWatch = &cpuStopWatch{} + } + return w +} + // NewTestStopWatch create a new StopWatch with the given time source. It is // used for testing only. func NewTestStopWatch(timeSource func() time.Time) *StopWatch { @@ -59,6 +73,7 @@ func (w *StopWatch) Start() { if !w.mu.started { w.mu.started = true w.mu.startedAt = w.mu.timeSource() + w.mu.cpuStopWatch.start() } } @@ -71,6 +86,7 @@ func (w *StopWatch) Stop() { if w.mu.started { w.mu.started = false w.mu.elapsed += w.mu.timeSource().Sub(w.mu.startedAt) + w.mu.cpuStopWatch.stop() } } @@ -81,6 +97,15 @@ func (w *StopWatch) Elapsed() time.Duration { return w.mu.elapsed } +// ElapsedCPU returns the total CPU time measured by the stop watch so far. It +// returns zero if cpuStopWatch is nil (which is the case if NewStopWatchWithCPU +// was not called or the platform does not support grunning). +func (w *StopWatch) ElapsedCPU() time.Duration { + w.mu.Lock() + defer w.mu.Unlock() + return w.mu.cpuStopWatch.elapsed() +} + // LastStartedAt returns the time the stopwatch was last started, and a bool // indicating if the stopwatch is currently started. func (w *StopWatch) LastStartedAt() (startedAt time.Time, started bool) {