Skip to content

Commit

Permalink
Merge #59812
Browse files Browse the repository at this point in the history
59812: execstats: fix query level stats calculation r=yuzefovich a=yuzefovich

**execstats: fix query level contention time calculation**

Previously, we were using an incorrect map to calculate the contention
time.

Release note: None

**execstats: fix GetQueryLevelStats when sub- and post-queries are present**

Previously, `GetQueryLevelStats` would incorrectly calculate the query
level stats when multiple `FlowMetadata` objects are passed in. That is
the case when either sub- or post-queries are present. As a result, we
would return the query level stats only for the last FlowMetadata
object. This is now fixed by aggregating the query level stats of all
FlowMetadata objects.

However, the test is missing since I don't yet see how to do it. It is
left as a TODO.

Additionally, this commit does some cleanup - using "Flows" instead of
"Flow" for metadata to indicate that multiple flows (that are part of
the same physical plan) correspond to a single metadata object (this was
a point of confusion to me), as well as some comments wrapping.

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Feb 10, 2021
2 parents 952af3c + 9489754 commit 79c80c4
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 85 deletions.
2 changes: 1 addition & 1 deletion pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ func (p *PlanningCtx) getDefaultSaveFlowsFunc(
return err
}
planner.curPlan.distSQLFlowInfos = append(
planner.curPlan.distSQLFlowInfos, flowInfo{typ: typ, diagram: diagram, flowMetadata: execstats.NewFlowMetadata(flows)},
planner.curPlan.distSQLFlowInfos, flowInfo{typ: typ, diagram: diagram, flowsMetadata: execstats.NewFlowsMetadata(flows)},
)
return nil
}
Expand Down
112 changes: 54 additions & 58 deletions pkg/sql/execstats/traceanalyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/cockroachdb/errors"
)

// processorStats contains stats for a specific processor extracted from a trace.
type processorStats struct {
nodeID roachpb.NodeID
stats *execinfrapb.ComponentStats
Expand All @@ -35,27 +34,30 @@ type flowStats struct {
stats []*execinfrapb.ComponentStats
}

// FlowMetadata contains metadata extracted from flows. This information is stored
// in sql.flowInfo and is analyzed by TraceAnalyzer.
type FlowMetadata struct {
// processorStats maps a processor ID to stats associated with this processor
// extracted from a trace as well as some metadata. Note that it is possible
// for the processorStats to have nil stats, which indicates that no stats
// were found for the given processor in the trace.
// FlowsMetadata contains metadata extracted from flows that comprise a single
// physical plan. This information is stored in sql.flowInfo and is analyzed by
// TraceAnalyzer.
type FlowsMetadata struct {
// processorStats maps a processor ID to stats associated with this
// processor extracted from a trace as well as some metadata. Note that it
// is possible for the processorStats to have nil stats, which indicates
// that no stats were found for the given processor in the trace.
processorStats map[execinfrapb.ProcessorID]*processorStats
// streamStats maps a stream ID to stats associated with this stream extracted
// from a trace as well as some metadata. Note that is is possible for the
// streamStats to have nil stats, which indicates that no stats were found
// for the given stream in the trace.
// streamStats maps a stream ID to stats associated with this stream
// extracted from a trace as well as some metadata. Note that it is possible
// for the streamStats to have nil stats, which indicates that no stats were
// found for the given stream in the trace.
streamStats map[execinfrapb.StreamID]*streamStats
// flowStats maps a node ID to flow level stats extracted from a trace. Note
// that the key is not a FlowID because the same FlowID is used across nodes.
// that the key is not a FlowID because the same FlowID is used across
// nodes.
flowStats map[roachpb.NodeID]*flowStats
}

// NewFlowMetadata creates a FlowMetadata with the given physical plan information.
func NewFlowMetadata(flows map[roachpb.NodeID]*execinfrapb.FlowSpec) *FlowMetadata {
a := &FlowMetadata{
// NewFlowsMetadata creates a FlowsMetadata for the given physical plan
// information.
func NewFlowsMetadata(flows map[roachpb.NodeID]*execinfrapb.FlowSpec) *FlowsMetadata {
a := &FlowsMetadata{
processorStats: make(map[execinfrapb.ProcessorID]*processorStats),
streamStats: make(map[execinfrapb.StreamID]*streamStats),
flowStats: make(map[roachpb.NodeID]*flowStats),
Expand Down Expand Up @@ -124,24 +126,16 @@ func (s *QueryLevelStats) Accumulate(other QueryLevelStats) {

// TraceAnalyzer is a struct that helps calculate top-level statistics from a
// flow metadata and an accompanying trace of the flows' execution.
// Example usage:
// analyzer := MakeTraceAnalyzer(flowMetadata)
// analyzer.AddTrace(trace)
// bytesGroupedByNode, err := analyzer.GetNetworkBytesSent()
type TraceAnalyzer struct {
*FlowMetadata
*FlowsMetadata
nodeLevelStats NodeLevelStats
queryLevelStats QueryLevelStats
}

// MakeTraceAnalyzer creates a TraceAnalyzer with the corresponding physical
// NewTraceAnalyzer creates a TraceAnalyzer with the corresponding physical
// plan. Call AddTrace to calculate meaningful stats.
func MakeTraceAnalyzer(flowMetadata *FlowMetadata) *TraceAnalyzer {
a := &TraceAnalyzer{
FlowMetadata: flowMetadata,
}

return a
func NewTraceAnalyzer(flowsMetadata *FlowsMetadata) *TraceAnalyzer {
return &TraceAnalyzer{FlowsMetadata: flowsMetadata}
}

// AddTrace adds the stats from the given trace to the TraceAnalyzer.
Expand Down Expand Up @@ -185,9 +179,10 @@ func (a *TraceAnalyzer) AddTrace(trace []tracingpb.RecordedSpan, makeDeterminist
return nil
}

// ProcessStats calculates node level and query level stats for the trace and stores them
// in TraceAnalyzer. If errors occur while calculating stats, ProcessStats returns the combined
// errors to the caller but continues calculating other stats.
// ProcessStats calculates node level and query level stats for the trace and
// stores them in TraceAnalyzer. If errors occur while calculating stats,
// ProcessStats returns the combined errors to the caller but continues
// calculating other stats.
func (a *TraceAnalyzer) ProcessStats() error {
// Process node level stats.
a.nodeLevelStats = NodeLevelStats{
Expand Down Expand Up @@ -226,13 +221,14 @@ func (a *TraceAnalyzer) ProcessStats() error {
a.nodeLevelStats.NetworkBytesSentGroupedByNode[stats.originNodeID] += bytes
}

// Set maxMemoryUsageFromStreamStats.
// The row execution flow attaches this stat to a stream stat with the last outbox, so we need to check
// stream stats for max memory usage.
// TODO(cathymw): maxMemUsage shouldn't be attached to span stats that are associated with streams,
// since it's a flow level stat. However, due to the row exec engine infrastructure, it is too
// complicated to attach this to a flow level span. If the row exec engine gets removed, getting
// maxMemUsage from streamStats should be removed as well.
// The row execution flow attaches this stat to a stream stat with the
// last outbox, so we need to check stream stats for max memory usage.
// TODO(cathymw): maxMemUsage shouldn't be attached to span stats that
// are associated with streams, since it's a flow level stat. However,
// due to the row exec engine infrastructure, it is too complicated to
// attach this to a flow level span. If the row exec engine gets
// removed, getting maxMemUsage from streamStats should be removed as
// well.
if stats.stats.FlowStats.MaxMemUsage.HasValue() {
if memUsage := int64(stats.stats.FlowStats.MaxMemUsage.Value()); memUsage > a.nodeLevelStats.MaxMemoryUsageGroupedByNode[stats.originNodeID] {
a.nodeLevelStats.MaxMemoryUsageGroupedByNode[stats.originNodeID] = memUsage
Expand All @@ -253,9 +249,8 @@ func (a *TraceAnalyzer) ProcessStats() error {
continue
}

// Set maxMemoryUsageFromFlowStats.
// The vectorized flow attaches the MaxMemUsage stat to a flow level span, so we need to check
// flow stats for max memory usage.
// The vectorized flow attaches the MaxMemUsage stat to a flow level
// span, so we need to check flow stats for max memory usage.
for _, v := range stats.stats {
if v.FlowStats.MaxMemUsage.HasValue() {
if memUsage := int64(v.FlowStats.MaxMemUsage.Value()); memUsage > a.nodeLevelStats.MaxMemoryUsageGroupedByNode[nodeID] {
Expand Down Expand Up @@ -301,16 +296,16 @@ func (a *TraceAnalyzer) ProcessStats() error {
a.queryLevelStats.NetworkMessages += networkMessages
}

for _, contentionTime := range a.nodeLevelStats.KVTimeGroupedByNode {
for _, contentionTime := range a.nodeLevelStats.ContentionTimeGroupedByNode {
a.queryLevelStats.ContentionTime += contentionTime
}
return errs
}

func getNetworkBytesFromComponentStats(v *execinfrapb.ComponentStats) (int64, error) {
// We expect exactly one of BytesReceived and BytesSent to be set.
// It may seem like we are double-counting everything (from both the send and
// the receive side) but in practice only one side of each stream presents
// We expect exactly one of BytesReceived and BytesSent to be set. It may
// seem like we are double-counting everything (from both the send and the
// receive side) but in practice only one side of each stream presents
// statistics (specifically the sending side in the row engine, and the
// receiving side in the vectorized engine).
if v.NetRx.BytesReceived.HasValue() {
Expand All @@ -326,8 +321,8 @@ func getNetworkBytesFromComponentStats(v *execinfrapb.ComponentStats) (int64, er
}

func getNumNetworkMessagesFromComponentsStats(v *execinfrapb.ComponentStats) (int64, error) {
// We expect exactly one of MessagesReceived and MessagesSent to be set.
// It may seem like we are double-counting everything (from both the send and
// We expect exactly one of MessagesReceived and MessagesSent to be set. It
// may seem like we are double-counting everything (from both the send and
// the receive side) but in practice only one side of each stream presents
// statistics (specifically the sending side in the row engine, and the
// receiving side in the vectorized engine).
Expand All @@ -343,27 +338,29 @@ func getNumNetworkMessagesFromComponentsStats(v *execinfrapb.ComponentStats) (in
return 0, errors.Errorf("could not get network messages; neither MessagesReceived and MessagesSent is set")
}

// GetNodeLevelStats returns the node level stats calculated and stored in the TraceAnalyzer.
// GetNodeLevelStats returns the node level stats calculated and stored in the
// TraceAnalyzer.
func (a *TraceAnalyzer) GetNodeLevelStats() NodeLevelStats {
return a.nodeLevelStats
}

// GetQueryLevelStats returns the query level stats calculated and stored in TraceAnalyzer.
// GetQueryLevelStats returns the query level stats calculated and stored in
// TraceAnalyzer.
func (a *TraceAnalyzer) GetQueryLevelStats() QueryLevelStats {
return a.queryLevelStats
}

// GetQueryLevelStats returns all the top-level stats in a QueryLevelStats struct.
// GetQueryLevelStats tries to process as many stats as possible. If errors occur
// while processing stats, GetQueryLevelStats returns the combined errors to the caller
// but continues calculating other stats.
// GetQueryLevelStats returns all the top-level stats in a QueryLevelStats
// struct. GetQueryLevelStats tries to process as many stats as possible. If
// errors occur while processing stats, GetQueryLevelStats returns the combined
// errors to the caller but continues calculating other stats.
func GetQueryLevelStats(
trace []tracingpb.RecordedSpan, deterministicExplainAnalyze bool, flowMetadata []*FlowMetadata,
trace []tracingpb.RecordedSpan, deterministicExplainAnalyze bool, flowsMetadata []*FlowsMetadata,
) (QueryLevelStats, error) {
var queryLevelStats QueryLevelStats
var errs error
for _, metadata := range flowMetadata {
analyzer := MakeTraceAnalyzer(metadata)
for _, metadata := range flowsMetadata {
analyzer := NewTraceAnalyzer(metadata)
if err := analyzer.AddTrace(trace, deterministicExplainAnalyze); err != nil {
errs = errors.CombineErrors(errs, errors.Wrap(err, "error analyzing trace statistics"))
continue
Expand All @@ -373,8 +370,7 @@ func GetQueryLevelStats(
errs = errors.CombineErrors(errs, err)
continue
}
queryLevelStats = analyzer.GetQueryLevelStats()
queryLevelStats.Accumulate(analyzer.GetQueryLevelStats())
}

return queryLevelStats, errs
}
71 changes: 59 additions & 12 deletions pkg/sql/execstats/traceanalyzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ func TestTraceAnalyzer(t *testing.T) {
return func(map[roachpb.NodeID]*execinfrapb.FlowSpec) error { return nil }
}
return func(flows map[roachpb.NodeID]*execinfrapb.FlowSpec) error {
flowMetadata := execstats.NewFlowMetadata(flows)
analyzer := execstats.MakeTraceAnalyzer(flowMetadata)
flowsMetadata := execstats.NewFlowsMetadata(flows)
analyzer := execstats.NewTraceAnalyzer(flowsMetadata)
analyzerChan <- analyzer
return nil
}
Expand Down Expand Up @@ -186,11 +186,14 @@ func TestTraceAnalyzer(t *testing.T) {

func TestTraceAnalyzerProcessStats(t *testing.T) {
const (
node1Time = 3 * time.Second
node2Time = 5 * time.Second
cumulativeTime = node1Time + node2Time
node1KVTime = 1 * time.Second
node1ContentionTime = 2 * time.Second
node2KVTime = 3 * time.Second
node2ContentionTime = 4 * time.Second
cumulativeKVTime = node1KVTime + node2KVTime
cumulativeContentionTime = node1ContentionTime + node2ContentionTime
)
a := &execstats.TraceAnalyzer{FlowMetadata: &execstats.FlowMetadata{}}
a := &execstats.TraceAnalyzer{FlowsMetadata: &execstats.FlowsMetadata{}}
a.AddComponentStats(
1, /* nodeID */
&execinfrapb.ComponentStats{
Expand All @@ -199,8 +202,8 @@ func TestTraceAnalyzerProcessStats(t *testing.T) {
1, /* processorID */
),
KV: execinfrapb.KVStats{
KVTime: optional.MakeTimeValue(node1Time),
ContentionTime: optional.MakeTimeValue(node1Time),
KVTime: optional.MakeTimeValue(node1KVTime),
ContentionTime: optional.MakeTimeValue(node1ContentionTime),
},
},
)
Expand All @@ -213,15 +216,15 @@ func TestTraceAnalyzerProcessStats(t *testing.T) {
2, /* processorID */
),
KV: execinfrapb.KVStats{
KVTime: optional.MakeTimeValue(node2Time),
ContentionTime: optional.MakeTimeValue(node2Time),
KVTime: optional.MakeTimeValue(node2KVTime),
ContentionTime: optional.MakeTimeValue(node2ContentionTime),
},
},
)

expected := execstats.QueryLevelStats{
KVTime: cumulativeTime,
ContentionTime: cumulativeTime,
KVTime: cumulativeKVTime,
ContentionTime: cumulativeContentionTime,
}

assert.NoError(t, a.ProcessStats())
Expand Down Expand Up @@ -275,3 +278,47 @@ func TestQueryLevelStatsAccumulate(t *testing.T) {
)
}
}

// TestGetQueryLevelStatsAccumulates does a sanity check that GetQueryLevelStats
// accumulates the stats for all flows passed into it. It does so by creating
// two FlowsMetadata objects and, thus, simulating a subquery and a main query.
func TestGetQueryLevelStatsAccumulates(t *testing.T) {
const f1KVTime = 1 * time.Second
const f2KVTime = 3 * time.Second

// Artificially inject component stats directly into the FlowsMetadata (in
// the non-testing setting the stats come from the trace).
var f1, f2 execstats.FlowsMetadata
f1.AddComponentStats(
1, /* nodeID */
&execinfrapb.ComponentStats{
Component: execinfrapb.ProcessorComponentID(
execinfrapb.FlowID{UUID: uuid.MakeV4()},
1, /* processorID */
),
KV: execinfrapb.KVStats{
KVTime: optional.MakeTimeValue(f1KVTime),
},
},
)
f2.AddComponentStats(
2, /* nodeID */
&execinfrapb.ComponentStats{
Component: execinfrapb.ProcessorComponentID(
execinfrapb.FlowID{UUID: uuid.MakeV4()},
2, /* processorID */
),
KV: execinfrapb.KVStats{
KVTime: optional.MakeTimeValue(f2KVTime),
},
},
)

queryLevelStats, err := execstats.GetQueryLevelStats(
nil, /* trace */
false, /* deterministicExplainAnalyze */
[]*execstats.FlowsMetadata{&f1, &f2},
)
require.NoError(t, err)
require.Equal(t, f1KVTime+f2KVTime, queryLevelStats.KVTime)
}
26 changes: 17 additions & 9 deletions pkg/sql/execstats/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,40 @@ import (
// processor/stream/flow specified in stats.ComponentID and the given node ID.
func (a *TraceAnalyzer) AddComponentStats(
nodeID roachpb.NodeID, stats *execinfrapb.ComponentStats,
) {
a.FlowsMetadata.AddComponentStats(nodeID, stats)
}

// AddComponentStats modifies FlowsMetadata to add stats for the
// processor/stream/flow specified in stats.ComponentID and the given node ID.
func (m *FlowsMetadata) AddComponentStats(
nodeID roachpb.NodeID, stats *execinfrapb.ComponentStats,
) {
switch stats.Component.Type {
case execinfrapb.ComponentID_PROCESSOR:
processorStat := &processorStats{
nodeID: nodeID,
stats: stats,
}
if a.FlowMetadata.processorStats == nil {
a.FlowMetadata.processorStats = make(map[execinfrapb.ProcessorID]*processorStats)
if m.processorStats == nil {
m.processorStats = make(map[execinfrapb.ProcessorID]*processorStats)
}
a.FlowMetadata.processorStats[execinfrapb.ProcessorID(stats.Component.ID)] = processorStat
m.processorStats[execinfrapb.ProcessorID(stats.Component.ID)] = processorStat
case execinfrapb.ComponentID_STREAM:
streamStat := &streamStats{
originNodeID: nodeID,
stats: stats,
}
if a.FlowMetadata.streamStats == nil {
a.FlowMetadata.streamStats = make(map[execinfrapb.StreamID]*streamStats)
if m.streamStats == nil {
m.streamStats = make(map[execinfrapb.StreamID]*streamStats)
}
a.FlowMetadata.streamStats[execinfrapb.StreamID(stats.Component.ID)] = streamStat
m.streamStats[execinfrapb.StreamID(stats.Component.ID)] = streamStat
default:
flowStat := &flowStats{}
flowStat.stats = append(flowStat.stats, stats)
if a.FlowMetadata.flowStats == nil {
a.FlowMetadata.flowStats = make(map[roachpb.NodeID]*flowStats)
if m.flowStats == nil {
m.flowStats = make(map[roachpb.NodeID]*flowStats)
}
a.FlowMetadata.flowStats[nodeID] = flowStat
m.flowStats[nodeID] = flowStat
}
}
6 changes: 3 additions & 3 deletions pkg/sql/instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,11 @@ func (ih *instrumentationHelper) Finish(
// TODO(radu): this should be unified with other stmt stats accesses.
stmtStats, _ := appStats.getStatsForStmt(ih.fingerprint, ih.implicitTxn, retErr, false)
if stmtStats != nil {
var flowMetadata []*execstats.FlowMetadata
var flowsMetadata []*execstats.FlowsMetadata
for _, flowInfo := range p.curPlan.distSQLFlowInfos {
flowMetadata = append(flowMetadata, flowInfo.flowMetadata)
flowsMetadata = append(flowsMetadata, flowInfo.flowsMetadata)
}
queryLevelStats, err := execstats.GetQueryLevelStats(trace, cfg.TestingKnobs.DeterministicExplainAnalyze, flowMetadata)
queryLevelStats, err := execstats.GetQueryLevelStats(trace, cfg.TestingKnobs.DeterministicExplainAnalyze, flowsMetadata)
if err != nil {
log.VInfof(ctx, 1, "error getting query level stats for statement %s: %+v", ast, err)
} else {
Expand Down
Loading

0 comments on commit 79c80c4

Please sign in to comment.