From ed0e01c54bf0355c3254e8a82c8a78559af29663 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Thu, 4 Feb 2021 08:42:59 -0800 Subject: [PATCH 1/2] execstats: fix query level contention time calculation Previously, we were using an incorrect map to calculate the contention time. Release note: None --- pkg/sql/execstats/traceanalyzer.go | 2 +- pkg/sql/execstats/traceanalyzer_test.go | 21 ++++++++++++--------- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/pkg/sql/execstats/traceanalyzer.go b/pkg/sql/execstats/traceanalyzer.go index 461beb78098d..50f8e783fe61 100644 --- a/pkg/sql/execstats/traceanalyzer.go +++ b/pkg/sql/execstats/traceanalyzer.go @@ -301,7 +301,7 @@ func (a *TraceAnalyzer) ProcessStats() error { a.queryLevelStats.NetworkMessages += networkMessages } - for _, contentionTime := range a.nodeLevelStats.KVTimeGroupedByNode { + for _, contentionTime := range a.nodeLevelStats.ContentionTimeGroupedByNode { a.queryLevelStats.ContentionTime += contentionTime } return errs diff --git a/pkg/sql/execstats/traceanalyzer_test.go b/pkg/sql/execstats/traceanalyzer_test.go index 48ded0314541..a09144aed779 100644 --- a/pkg/sql/execstats/traceanalyzer_test.go +++ b/pkg/sql/execstats/traceanalyzer_test.go @@ -186,9 +186,12 @@ func TestTraceAnalyzer(t *testing.T) { func TestTraceAnalyzerProcessStats(t *testing.T) { const ( - node1Time = 3 * time.Second - node2Time = 5 * time.Second - cumulativeTime = node1Time + node2Time + node1KVTime = 1 * time.Second + node1ContentionTime = 2 * time.Second + node2KVTime = 3 * time.Second + node2ContentionTime = 4 * time.Second + cumulativeKVTime = node1KVTime + node2KVTime + cumulativeContentionTime = node1ContentionTime + node2ContentionTime ) a := &execstats.TraceAnalyzer{FlowMetadata: &execstats.FlowMetadata{}} a.AddComponentStats( @@ -199,8 +202,8 @@ func TestTraceAnalyzerProcessStats(t *testing.T) { 1, /* processorID */ ), KV: execinfrapb.KVStats{ - KVTime: optional.MakeTimeValue(node1Time), - ContentionTime: optional.MakeTimeValue(node1Time), + KVTime: optional.MakeTimeValue(node1KVTime), + ContentionTime: optional.MakeTimeValue(node1ContentionTime), }, }, ) @@ -213,15 +216,15 @@ func TestTraceAnalyzerProcessStats(t *testing.T) { 2, /* processorID */ ), KV: execinfrapb.KVStats{ - KVTime: optional.MakeTimeValue(node2Time), - ContentionTime: optional.MakeTimeValue(node2Time), + KVTime: optional.MakeTimeValue(node2KVTime), + ContentionTime: optional.MakeTimeValue(node2ContentionTime), }, }, ) expected := execstats.QueryLevelStats{ - KVTime: cumulativeTime, - ContentionTime: cumulativeTime, + KVTime: cumulativeKVTime, + ContentionTime: cumulativeContentionTime, } assert.NoError(t, a.ProcessStats()) From 9489754565616a284054d2aae791060e493ca2d0 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Thu, 4 Feb 2021 10:36:32 -0800 Subject: [PATCH 2/2] execstats: fix GetQueryLevelStats when sub- and post-queries are present Previously, `GetQueryLevelStats` would incorrectly calculate the query level stats when multiple `FlowMetadata` objects are passed in. That is the case when either sub- or post-queries are present. As a result, we would return the query level stats only for the last FlowMetadata object. This is now fixed by aggregating the query level stats of all FlowMetadata objects. Additionally, this commit does some cleanup - using "Flows" instead of "Flow" for metadata to indicate that multiple flows (that are part of the same physical plan) correspond to a single metadata object (this was a point of confusion to me), as well as some comments wrapping. Release note: None --- pkg/sql/distsql_physical_planner.go | 2 +- pkg/sql/execstats/traceanalyzer.go | 110 ++++++++++++------------ pkg/sql/execstats/traceanalyzer_test.go | 50 ++++++++++- pkg/sql/execstats/utils_test.go | 26 ++++-- pkg/sql/instrumentation.go | 6 +- pkg/sql/plan.go | 5 +- 6 files changed, 124 insertions(+), 75 deletions(-) diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index aa017d79dbd8..aed23320a0c4 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -681,7 +681,7 @@ func (p *PlanningCtx) getDefaultSaveFlowsFunc( return err } planner.curPlan.distSQLFlowInfos = append( - planner.curPlan.distSQLFlowInfos, flowInfo{typ: typ, diagram: diagram, flowMetadata: execstats.NewFlowMetadata(flows)}, + planner.curPlan.distSQLFlowInfos, flowInfo{typ: typ, diagram: diagram, flowsMetadata: execstats.NewFlowsMetadata(flows)}, ) return nil } diff --git a/pkg/sql/execstats/traceanalyzer.go b/pkg/sql/execstats/traceanalyzer.go index 50f8e783fe61..a4f19cf9c2bc 100644 --- a/pkg/sql/execstats/traceanalyzer.go +++ b/pkg/sql/execstats/traceanalyzer.go @@ -19,7 +19,6 @@ import ( "github.com/cockroachdb/errors" ) -// processorStats contains stats for a specific processor extracted from a trace. type processorStats struct { nodeID roachpb.NodeID stats *execinfrapb.ComponentStats @@ -35,27 +34,30 @@ type flowStats struct { stats []*execinfrapb.ComponentStats } -// FlowMetadata contains metadata extracted from flows. This information is stored -// in sql.flowInfo and is analyzed by TraceAnalyzer. -type FlowMetadata struct { - // processorStats maps a processor ID to stats associated with this processor - // extracted from a trace as well as some metadata. Note that it is possible - // for the processorStats to have nil stats, which indicates that no stats - // were found for the given processor in the trace. +// FlowsMetadata contains metadata extracted from flows that comprise a single +// physical plan. This information is stored in sql.flowInfo and is analyzed by +// TraceAnalyzer. +type FlowsMetadata struct { + // processorStats maps a processor ID to stats associated with this + // processor extracted from a trace as well as some metadata. Note that it + // is possible for the processorStats to have nil stats, which indicates + // that no stats were found for the given processor in the trace. processorStats map[execinfrapb.ProcessorID]*processorStats - // streamStats maps a stream ID to stats associated with this stream extracted - // from a trace as well as some metadata. Note that is is possible for the - // streamStats to have nil stats, which indicates that no stats were found - // for the given stream in the trace. + // streamStats maps a stream ID to stats associated with this stream + // extracted from a trace as well as some metadata. Note that it is possible + // for the streamStats to have nil stats, which indicates that no stats were + // found for the given stream in the trace. streamStats map[execinfrapb.StreamID]*streamStats // flowStats maps a node ID to flow level stats extracted from a trace. Note - // that the key is not a FlowID because the same FlowID is used across nodes. + // that the key is not a FlowID because the same FlowID is used across + // nodes. flowStats map[roachpb.NodeID]*flowStats } -// NewFlowMetadata creates a FlowMetadata with the given physical plan information. -func NewFlowMetadata(flows map[roachpb.NodeID]*execinfrapb.FlowSpec) *FlowMetadata { - a := &FlowMetadata{ +// NewFlowsMetadata creates a FlowsMetadata for the given physical plan +// information. +func NewFlowsMetadata(flows map[roachpb.NodeID]*execinfrapb.FlowSpec) *FlowsMetadata { + a := &FlowsMetadata{ processorStats: make(map[execinfrapb.ProcessorID]*processorStats), streamStats: make(map[execinfrapb.StreamID]*streamStats), flowStats: make(map[roachpb.NodeID]*flowStats), @@ -124,24 +126,16 @@ func (s *QueryLevelStats) Accumulate(other QueryLevelStats) { // TraceAnalyzer is a struct that helps calculate top-level statistics from a // flow metadata and an accompanying trace of the flows' execution. -// Example usage: -// analyzer := MakeTraceAnalyzer(flowMetadata) -// analyzer.AddTrace(trace) -// bytesGroupedByNode, err := analyzer.GetNetworkBytesSent() type TraceAnalyzer struct { - *FlowMetadata + *FlowsMetadata nodeLevelStats NodeLevelStats queryLevelStats QueryLevelStats } -// MakeTraceAnalyzer creates a TraceAnalyzer with the corresponding physical +// NewTraceAnalyzer creates a TraceAnalyzer with the corresponding physical // plan. Call AddTrace to calculate meaningful stats. -func MakeTraceAnalyzer(flowMetadata *FlowMetadata) *TraceAnalyzer { - a := &TraceAnalyzer{ - FlowMetadata: flowMetadata, - } - - return a +func NewTraceAnalyzer(flowsMetadata *FlowsMetadata) *TraceAnalyzer { + return &TraceAnalyzer{FlowsMetadata: flowsMetadata} } // AddTrace adds the stats from the given trace to the TraceAnalyzer. @@ -185,9 +179,10 @@ func (a *TraceAnalyzer) AddTrace(trace []tracingpb.RecordedSpan, makeDeterminist return nil } -// ProcessStats calculates node level and query level stats for the trace and stores them -// in TraceAnalyzer. If errors occur while calculating stats, ProcessStats returns the combined -// errors to the caller but continues calculating other stats. +// ProcessStats calculates node level and query level stats for the trace and +// stores them in TraceAnalyzer. If errors occur while calculating stats, +// ProcessStats returns the combined errors to the caller but continues +// calculating other stats. func (a *TraceAnalyzer) ProcessStats() error { // Process node level stats. a.nodeLevelStats = NodeLevelStats{ @@ -226,13 +221,14 @@ func (a *TraceAnalyzer) ProcessStats() error { a.nodeLevelStats.NetworkBytesSentGroupedByNode[stats.originNodeID] += bytes } - // Set maxMemoryUsageFromStreamStats. - // The row execution flow attaches this stat to a stream stat with the last outbox, so we need to check - // stream stats for max memory usage. - // TODO(cathymw): maxMemUsage shouldn't be attached to span stats that are associated with streams, - // since it's a flow level stat. However, due to the row exec engine infrastructure, it is too - // complicated to attach this to a flow level span. If the row exec engine gets removed, getting - // maxMemUsage from streamStats should be removed as well. + // The row execution flow attaches this stat to a stream stat with the + // last outbox, so we need to check stream stats for max memory usage. + // TODO(cathymw): maxMemUsage shouldn't be attached to span stats that + // are associated with streams, since it's a flow level stat. However, + // due to the row exec engine infrastructure, it is too complicated to + // attach this to a flow level span. If the row exec engine gets + // removed, getting maxMemUsage from streamStats should be removed as + // well. if stats.stats.FlowStats.MaxMemUsage.HasValue() { if memUsage := int64(stats.stats.FlowStats.MaxMemUsage.Value()); memUsage > a.nodeLevelStats.MaxMemoryUsageGroupedByNode[stats.originNodeID] { a.nodeLevelStats.MaxMemoryUsageGroupedByNode[stats.originNodeID] = memUsage @@ -253,9 +249,8 @@ func (a *TraceAnalyzer) ProcessStats() error { continue } - // Set maxMemoryUsageFromFlowStats. - // The vectorized flow attaches the MaxMemUsage stat to a flow level span, so we need to check - // flow stats for max memory usage. + // The vectorized flow attaches the MaxMemUsage stat to a flow level + // span, so we need to check flow stats for max memory usage. for _, v := range stats.stats { if v.FlowStats.MaxMemUsage.HasValue() { if memUsage := int64(v.FlowStats.MaxMemUsage.Value()); memUsage > a.nodeLevelStats.MaxMemoryUsageGroupedByNode[nodeID] { @@ -308,9 +303,9 @@ func (a *TraceAnalyzer) ProcessStats() error { } func getNetworkBytesFromComponentStats(v *execinfrapb.ComponentStats) (int64, error) { - // We expect exactly one of BytesReceived and BytesSent to be set. - // It may seem like we are double-counting everything (from both the send and - // the receive side) but in practice only one side of each stream presents + // We expect exactly one of BytesReceived and BytesSent to be set. It may + // seem like we are double-counting everything (from both the send and the + // receive side) but in practice only one side of each stream presents // statistics (specifically the sending side in the row engine, and the // receiving side in the vectorized engine). if v.NetRx.BytesReceived.HasValue() { @@ -326,8 +321,8 @@ func getNetworkBytesFromComponentStats(v *execinfrapb.ComponentStats) (int64, er } func getNumNetworkMessagesFromComponentsStats(v *execinfrapb.ComponentStats) (int64, error) { - // We expect exactly one of MessagesReceived and MessagesSent to be set. - // It may seem like we are double-counting everything (from both the send and + // We expect exactly one of MessagesReceived and MessagesSent to be set. It + // may seem like we are double-counting everything (from both the send and // the receive side) but in practice only one side of each stream presents // statistics (specifically the sending side in the row engine, and the // receiving side in the vectorized engine). @@ -343,27 +338,29 @@ func getNumNetworkMessagesFromComponentsStats(v *execinfrapb.ComponentStats) (in return 0, errors.Errorf("could not get network messages; neither MessagesReceived and MessagesSent is set") } -// GetNodeLevelStats returns the node level stats calculated and stored in the TraceAnalyzer. +// GetNodeLevelStats returns the node level stats calculated and stored in the +// TraceAnalyzer. func (a *TraceAnalyzer) GetNodeLevelStats() NodeLevelStats { return a.nodeLevelStats } -// GetQueryLevelStats returns the query level stats calculated and stored in TraceAnalyzer. +// GetQueryLevelStats returns the query level stats calculated and stored in +// TraceAnalyzer. func (a *TraceAnalyzer) GetQueryLevelStats() QueryLevelStats { return a.queryLevelStats } -// GetQueryLevelStats returns all the top-level stats in a QueryLevelStats struct. -// GetQueryLevelStats tries to process as many stats as possible. If errors occur -// while processing stats, GetQueryLevelStats returns the combined errors to the caller -// but continues calculating other stats. +// GetQueryLevelStats returns all the top-level stats in a QueryLevelStats +// struct. GetQueryLevelStats tries to process as many stats as possible. If +// errors occur while processing stats, GetQueryLevelStats returns the combined +// errors to the caller but continues calculating other stats. func GetQueryLevelStats( - trace []tracingpb.RecordedSpan, deterministicExplainAnalyze bool, flowMetadata []*FlowMetadata, + trace []tracingpb.RecordedSpan, deterministicExplainAnalyze bool, flowsMetadata []*FlowsMetadata, ) (QueryLevelStats, error) { var queryLevelStats QueryLevelStats var errs error - for _, metadata := range flowMetadata { - analyzer := MakeTraceAnalyzer(metadata) + for _, metadata := range flowsMetadata { + analyzer := NewTraceAnalyzer(metadata) if err := analyzer.AddTrace(trace, deterministicExplainAnalyze); err != nil { errs = errors.CombineErrors(errs, errors.Wrap(err, "error analyzing trace statistics")) continue @@ -373,8 +370,7 @@ func GetQueryLevelStats( errs = errors.CombineErrors(errs, err) continue } - queryLevelStats = analyzer.GetQueryLevelStats() + queryLevelStats.Accumulate(analyzer.GetQueryLevelStats()) } - return queryLevelStats, errs } diff --git a/pkg/sql/execstats/traceanalyzer_test.go b/pkg/sql/execstats/traceanalyzer_test.go index a09144aed779..758ead28c526 100644 --- a/pkg/sql/execstats/traceanalyzer_test.go +++ b/pkg/sql/execstats/traceanalyzer_test.go @@ -63,8 +63,8 @@ func TestTraceAnalyzer(t *testing.T) { return func(map[roachpb.NodeID]*execinfrapb.FlowSpec) error { return nil } } return func(flows map[roachpb.NodeID]*execinfrapb.FlowSpec) error { - flowMetadata := execstats.NewFlowMetadata(flows) - analyzer := execstats.MakeTraceAnalyzer(flowMetadata) + flowsMetadata := execstats.NewFlowsMetadata(flows) + analyzer := execstats.NewTraceAnalyzer(flowsMetadata) analyzerChan <- analyzer return nil } @@ -193,7 +193,7 @@ func TestTraceAnalyzerProcessStats(t *testing.T) { cumulativeKVTime = node1KVTime + node2KVTime cumulativeContentionTime = node1ContentionTime + node2ContentionTime ) - a := &execstats.TraceAnalyzer{FlowMetadata: &execstats.FlowMetadata{}} + a := &execstats.TraceAnalyzer{FlowsMetadata: &execstats.FlowsMetadata{}} a.AddComponentStats( 1, /* nodeID */ &execinfrapb.ComponentStats{ @@ -278,3 +278,47 @@ func TestQueryLevelStatsAccumulate(t *testing.T) { ) } } + +// TestGetQueryLevelStatsAccumulates does a sanity check that GetQueryLevelStats +// accumulates the stats for all flows passed into it. It does so by creating +// two FlowsMetadata objects and, thus, simulating a subquery and a main query. +func TestGetQueryLevelStatsAccumulates(t *testing.T) { + const f1KVTime = 1 * time.Second + const f2KVTime = 3 * time.Second + + // Artificially inject component stats directly into the FlowsMetadata (in + // the non-testing setting the stats come from the trace). + var f1, f2 execstats.FlowsMetadata + f1.AddComponentStats( + 1, /* nodeID */ + &execinfrapb.ComponentStats{ + Component: execinfrapb.ProcessorComponentID( + execinfrapb.FlowID{UUID: uuid.MakeV4()}, + 1, /* processorID */ + ), + KV: execinfrapb.KVStats{ + KVTime: optional.MakeTimeValue(f1KVTime), + }, + }, + ) + f2.AddComponentStats( + 2, /* nodeID */ + &execinfrapb.ComponentStats{ + Component: execinfrapb.ProcessorComponentID( + execinfrapb.FlowID{UUID: uuid.MakeV4()}, + 2, /* processorID */ + ), + KV: execinfrapb.KVStats{ + KVTime: optional.MakeTimeValue(f2KVTime), + }, + }, + ) + + queryLevelStats, err := execstats.GetQueryLevelStats( + nil, /* trace */ + false, /* deterministicExplainAnalyze */ + []*execstats.FlowsMetadata{&f1, &f2}, + ) + require.NoError(t, err) + require.Equal(t, f1KVTime+f2KVTime, queryLevelStats.KVTime) +} diff --git a/pkg/sql/execstats/utils_test.go b/pkg/sql/execstats/utils_test.go index 3ee5aea41ed4..9519591726f8 100644 --- a/pkg/sql/execstats/utils_test.go +++ b/pkg/sql/execstats/utils_test.go @@ -19,6 +19,14 @@ import ( // processor/stream/flow specified in stats.ComponentID and the given node ID. func (a *TraceAnalyzer) AddComponentStats( nodeID roachpb.NodeID, stats *execinfrapb.ComponentStats, +) { + a.FlowsMetadata.AddComponentStats(nodeID, stats) +} + +// AddComponentStats modifies FlowsMetadata to add stats for the +// processor/stream/flow specified in stats.ComponentID and the given node ID. +func (m *FlowsMetadata) AddComponentStats( + nodeID roachpb.NodeID, stats *execinfrapb.ComponentStats, ) { switch stats.Component.Type { case execinfrapb.ComponentID_PROCESSOR: @@ -26,25 +34,25 @@ func (a *TraceAnalyzer) AddComponentStats( nodeID: nodeID, stats: stats, } - if a.FlowMetadata.processorStats == nil { - a.FlowMetadata.processorStats = make(map[execinfrapb.ProcessorID]*processorStats) + if m.processorStats == nil { + m.processorStats = make(map[execinfrapb.ProcessorID]*processorStats) } - a.FlowMetadata.processorStats[execinfrapb.ProcessorID(stats.Component.ID)] = processorStat + m.processorStats[execinfrapb.ProcessorID(stats.Component.ID)] = processorStat case execinfrapb.ComponentID_STREAM: streamStat := &streamStats{ originNodeID: nodeID, stats: stats, } - if a.FlowMetadata.streamStats == nil { - a.FlowMetadata.streamStats = make(map[execinfrapb.StreamID]*streamStats) + if m.streamStats == nil { + m.streamStats = make(map[execinfrapb.StreamID]*streamStats) } - a.FlowMetadata.streamStats[execinfrapb.StreamID(stats.Component.ID)] = streamStat + m.streamStats[execinfrapb.StreamID(stats.Component.ID)] = streamStat default: flowStat := &flowStats{} flowStat.stats = append(flowStat.stats, stats) - if a.FlowMetadata.flowStats == nil { - a.FlowMetadata.flowStats = make(map[roachpb.NodeID]*flowStats) + if m.flowStats == nil { + m.flowStats = make(map[roachpb.NodeID]*flowStats) } - a.FlowMetadata.flowStats[nodeID] = flowStat + m.flowStats[nodeID] = flowStat } } diff --git a/pkg/sql/instrumentation.go b/pkg/sql/instrumentation.go index 62261a5a449f..2c278d98cc50 100644 --- a/pkg/sql/instrumentation.go +++ b/pkg/sql/instrumentation.go @@ -225,11 +225,11 @@ func (ih *instrumentationHelper) Finish( // TODO(radu): this should be unified with other stmt stats accesses. stmtStats, _ := appStats.getStatsForStmt(ih.fingerprint, ih.implicitTxn, retErr, false) if stmtStats != nil { - var flowMetadata []*execstats.FlowMetadata + var flowsMetadata []*execstats.FlowsMetadata for _, flowInfo := range p.curPlan.distSQLFlowInfos { - flowMetadata = append(flowMetadata, flowInfo.flowMetadata) + flowsMetadata = append(flowsMetadata, flowInfo.flowsMetadata) } - queryLevelStats, err := execstats.GetQueryLevelStats(trace, cfg.TestingKnobs.DeterministicExplainAnalyze, flowMetadata) + queryLevelStats, err := execstats.GetQueryLevelStats(trace, cfg.TestingKnobs.DeterministicExplainAnalyze, flowsMetadata) if err != nil { log.VInfof(ctx, 1, "error getting query level stats for statement %s: %+v", ast, err) } else { diff --git a/pkg/sql/plan.go b/pkg/sql/plan.go index de3bffa5d787..bc6d7a0279d1 100644 --- a/pkg/sql/plan.go +++ b/pkg/sql/plan.go @@ -267,8 +267,9 @@ var _ planNodeSpooled = &spoolNode{} type flowInfo struct { typ planComponentType diagram execinfrapb.FlowDiagram - // FlowMetadata stores metadata from flows that will be used by TraceAnalyzer. - flowMetadata *execstats.FlowMetadata + // flowsMetadata stores metadata from flows that will be used by + // execstats.TraceAnalyzer. + flowsMetadata *execstats.FlowsMetadata } // planTop is the struct that collects the properties