Skip to content

Commit

Permalink
execstats: fix GetQueryLevelStats when sub- and post-queries are present
Browse files Browse the repository at this point in the history
Previously, `GetQueryLevelStats` would incorrectly calculate the query
level stats when multiple `FlowMetadata` objects are passed in. That is
the case when either sub- or post-queries are present. As a result, we
would return the query level stats only for the last FlowMetadata
object. This is now fixed by aggregating the query level stats of all
FlowMetadata objects.

However, the test is missing since I don't yet see how to do it. It is
left as a TODO.

Additionally, this commit does some cleanup - using "Flows" instead of
"Flow" for metadata to indicate that multiple flows (that are part of
the same physical plan) correspond to a single metadata object (this was
a point of confusion to me), as well as some comments wrapping.

Release note: None
  • Loading branch information
yuzefovich committed Feb 4, 2021
1 parent 6c8dd8d commit cafb54d
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 75 deletions.
2 changes: 1 addition & 1 deletion pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ func (p *PlanningCtx) getDefaultSaveFlowsFunc(
return err
}
planner.curPlan.distSQLFlowInfos = append(
planner.curPlan.distSQLFlowInfos, flowInfo{typ: typ, diagram: diagram, flowMetadata: execstats.NewFlowMetadata(flows)},
planner.curPlan.distSQLFlowInfos, flowInfo{typ: typ, diagram: diagram, flowsMetadata: execstats.NewFlowsMetadata(flows)},
)
return nil
}
Expand Down
124 changes: 67 additions & 57 deletions pkg/sql/execstats/traceanalyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/cockroachdb/errors"
)

// processorStats contains stats for a specific processor extracted from a trace.
type processorStats struct {
nodeID roachpb.NodeID
stats *execinfrapb.ComponentStats
Expand All @@ -35,27 +34,29 @@ type flowStats struct {
stats []*execinfrapb.ComponentStats
}

// FlowMetadata contains metadata extracted from flows. This information is stored
// in sql.flowInfo and is analyzed by TraceAnalyzer.
type FlowMetadata struct {
// processorStats maps a processor ID to stats associated with this processor
// extracted from a trace as well as some metadata. Note that it is possible
// for the processorStats to have nil stats, which indicates that no stats
// were found for the given processor in the trace.
// FlowsMetadata contains metadata extracted from flows. This information is
// stored in sql.flowInfo and is analyzed by TraceAnalyzer.
type FlowsMetadata struct {
// processorStats maps a processor ID to stats associated with this
// processor extracted from a trace as well as some metadata. Note that it
// is possible for the processorStats to have nil stats, which indicates
// that no stats were found for the given processor in the trace.
processorStats map[execinfrapb.ProcessorID]*processorStats
// streamStats maps a stream ID to stats associated with this stream extracted
// from a trace as well as some metadata. Note that is is possible for the
// streamStats to have nil stats, which indicates that no stats were found
// for the given stream in the trace.
// streamStats maps a stream ID to stats associated with this stream
// extracted from a trace as well as some metadata. Note that it is possible
// for the streamStats to have nil stats, which indicates that no stats were
// found for the given stream in the trace.
streamStats map[execinfrapb.StreamID]*streamStats
// flowStats maps a node ID to flow level stats extracted from a trace. Note
// that the key is not a FlowID because the same FlowID is used across nodes.
// that the key is not a FlowID because the same FlowID is used across
// nodes.
flowStats map[roachpb.NodeID]*flowStats
}

// NewFlowMetadata creates a FlowMetadata with the given physical plan information.
func NewFlowMetadata(flows map[roachpb.NodeID]*execinfrapb.FlowSpec) *FlowMetadata {
a := &FlowMetadata{
// NewFlowsMetadata creates a FlowsMetadata for the given physical plan
// information.
func NewFlowsMetadata(flows map[roachpb.NodeID]*execinfrapb.FlowSpec) *FlowsMetadata {
a := &FlowsMetadata{
processorStats: make(map[execinfrapb.ProcessorID]*processorStats),
streamStats: make(map[execinfrapb.StreamID]*streamStats),
flowStats: make(map[roachpb.NodeID]*flowStats),
Expand Down Expand Up @@ -108,26 +109,31 @@ type QueryLevelStats struct {
ContentionTime time.Duration
}

// aggregate updates s to include stats from other.
func (s *QueryLevelStats) aggregate(other QueryLevelStats) {
s.NetworkBytesSent += other.NetworkBytesSent
if other.MaxMemUsage > s.MaxMemUsage {
s.MaxMemUsage = other.MaxMemUsage
}
s.KVBytesRead += other.KVBytesRead
s.KVRowsRead += other.KVRowsRead
s.KVTime += other.KVTime
s.NetworkMessages += other.NetworkMessages
s.ContentionTime += other.ContentionTime
}

// TraceAnalyzer is a struct that helps calculate top-level statistics from a
// flow metadata and an accompanying trace of the flows' execution.
// Example usage:
// analyzer := MakeTraceAnalyzer(flowMetadata)
// analyzer.AddTrace(trace)
// bytesGroupedByNode, err := analyzer.GetNetworkBytesSent()
type TraceAnalyzer struct {
*FlowMetadata
*FlowsMetadata
nodeLevelStats NodeLevelStats
queryLevelStats QueryLevelStats
}

// MakeTraceAnalyzer creates a TraceAnalyzer with the corresponding physical
// NewTraceAnalyzer creates a TraceAnalyzer with the corresponding physical
// plan. Call AddTrace to calculate meaningful stats.
func MakeTraceAnalyzer(flowMetadata *FlowMetadata) *TraceAnalyzer {
a := &TraceAnalyzer{
FlowMetadata: flowMetadata,
}

return a
func NewTraceAnalyzer(flowsMetadata *FlowsMetadata) *TraceAnalyzer {
return &TraceAnalyzer{FlowsMetadata: flowsMetadata}
}

// AddTrace adds the stats from the given trace to the TraceAnalyzer.
Expand Down Expand Up @@ -171,9 +177,10 @@ func (a *TraceAnalyzer) AddTrace(trace []tracingpb.RecordedSpan, makeDeterminist
return nil
}

// ProcessStats calculates node level and query level stats for the trace and stores them
// in TraceAnalyzer. If errors occur while calculating stats, ProcessStats returns the combined
// errors to the caller but continues calculating other stats.
// ProcessStats calculates node level and query level stats for the trace and
// stores them in TraceAnalyzer. If errors occur while calculating stats,
// ProcessStats returns the combined errors to the caller but continues
// calculating other stats.
func (a *TraceAnalyzer) ProcessStats() error {
// Process node level stats.
a.nodeLevelStats = NodeLevelStats{
Expand Down Expand Up @@ -212,13 +219,14 @@ func (a *TraceAnalyzer) ProcessStats() error {
a.nodeLevelStats.NetworkBytesSentGroupedByNode[stats.originNodeID] += bytes
}

// Set maxMemoryUsageFromStreamStats.
// The row execution flow attaches this stat to a stream stat with the last outbox, so we need to check
// stream stats for max memory usage.
// TODO(cathymw): maxMemUsage shouldn't be attached to span stats that are associated with streams,
// since it's a flow level stat. However, due to the row exec engine infrastructure, it is too
// complicated to attach this to a flow level span. If the row exec engine gets removed, getting
// maxMemUsage from streamStats should be removed as well.
// The row execution flow attaches this stat to a stream stat with the
// last outbox, so we need to check stream stats for max memory usage.
// TODO(cathymw): maxMemUsage shouldn't be attached to span stats that
// are associated with streams, since it's a flow level stat. However,
// due to the row exec engine infrastructure, it is too complicated to
// attach this to a flow level span. If the row exec engine gets
// removed, getting maxMemUsage from streamStats should be removed as
// well.
if stats.stats.FlowStats.MaxMemUsage.HasValue() {
if memUsage := int64(stats.stats.FlowStats.MaxMemUsage.Value()); memUsage > a.nodeLevelStats.MaxMemoryUsageGroupedByNode[stats.originNodeID] {
a.nodeLevelStats.MaxMemoryUsageGroupedByNode[stats.originNodeID] = memUsage
Expand All @@ -239,9 +247,8 @@ func (a *TraceAnalyzer) ProcessStats() error {
continue
}

// Set maxMemoryUsageFromFlowStats.
// The vectorized flow attaches the MaxMemUsage stat to a flow level span, so we need to check
// flow stats for max memory usage.
// The vectorized flow attaches the MaxMemUsage stat to a flow level
// span, so we need to check flow stats for max memory usage.
for _, v := range stats.stats {
if v.FlowStats.MaxMemUsage.HasValue() {
if memUsage := int64(v.FlowStats.MaxMemUsage.Value()); memUsage > a.nodeLevelStats.MaxMemoryUsageGroupedByNode[nodeID] {
Expand Down Expand Up @@ -294,9 +301,9 @@ func (a *TraceAnalyzer) ProcessStats() error {
}

func getNetworkBytesFromComponentStats(v *execinfrapb.ComponentStats) (int64, error) {
// We expect exactly one of BytesReceived and BytesSent to be set.
// It may seem like we are double-counting everything (from both the send and
// the receive side) but in practice only one side of each stream presents
// We expect exactly one of BytesReceived and BytesSent to be set. It may
// seem like we are double-counting everything (from both the send and the
// receive side) but in practice only one side of each stream presents
// statistics (specifically the sending side in the row engine, and the
// receiving side in the vectorized engine).
if v.NetRx.BytesReceived.HasValue() {
Expand All @@ -312,8 +319,8 @@ func getNetworkBytesFromComponentStats(v *execinfrapb.ComponentStats) (int64, er
}

func getNumNetworkMessagesFromComponentsStats(v *execinfrapb.ComponentStats) (int64, error) {
// We expect exactly one of MessagesReceived and MessagesSent to be set.
// It may seem like we are double-counting everything (from both the send and
// We expect exactly one of MessagesReceived and MessagesSent to be set. It
// may seem like we are double-counting everything (from both the send and
// the receive side) but in practice only one side of each stream presents
// statistics (specifically the sending side in the row engine, and the
// receiving side in the vectorized engine).
Expand All @@ -329,27 +336,31 @@ func getNumNetworkMessagesFromComponentsStats(v *execinfrapb.ComponentStats) (in
return 0, errors.Errorf("could not get network messages; neither MessagesReceived and MessagesSent is set")
}

// GetNodeLevelStats returns the node level stats calculated and stored in the TraceAnalyzer.
// GetNodeLevelStats returns the node level stats calculated and stored in the
// TraceAnalyzer.
func (a *TraceAnalyzer) GetNodeLevelStats() NodeLevelStats {
return a.nodeLevelStats
}

// GetQueryLevelStats returns the query level stats calculated and stored in TraceAnalyzer.
// GetQueryLevelStats returns the query level stats calculated and stored in
// TraceAnalyzer.
func (a *TraceAnalyzer) GetQueryLevelStats() QueryLevelStats {
return a.queryLevelStats
}

// GetQueryLevelStats returns all the top-level stats in a QueryLevelStats struct.
// GetQueryLevelStats tries to process as many stats as possible. If errors occur
// while processing stats, GetQueryLevelStats returns the combined errors to the caller
// but continues calculating other stats.
// GetQueryLevelStats returns all the top-level stats in a QueryLevelStats
// struct. GetQueryLevelStats tries to process as many stats as possible. If
// errors occur while processing stats, GetQueryLevelStats returns the combined
// errors to the caller but continues calculating other stats.
// TODO(yuzefovich): add a test for this method when multiple flowsMetadata
// objects are passed-in.
func GetQueryLevelStats(
trace []tracingpb.RecordedSpan, deterministicExplainAnalyze bool, flowMetadata []*FlowMetadata,
trace []tracingpb.RecordedSpan, deterministicExplainAnalyze bool, flowsMetadata []*FlowsMetadata,
) (QueryLevelStats, error) {
var queryLevelStats QueryLevelStats
var errs error
for _, metadata := range flowMetadata {
analyzer := MakeTraceAnalyzer(metadata)
for _, metadata := range flowsMetadata {
analyzer := NewTraceAnalyzer(metadata)
if err := analyzer.AddTrace(trace, deterministicExplainAnalyze); err != nil {
errs = errors.CombineErrors(errs, errors.Wrap(err, "error analyzing trace statistics"))
continue
Expand All @@ -359,8 +370,7 @@ func GetQueryLevelStats(
errs = errors.CombineErrors(errs, err)
continue
}
queryLevelStats = analyzer.GetQueryLevelStats()
queryLevelStats.aggregate(analyzer.GetQueryLevelStats())
}

return queryLevelStats, errs
}
6 changes: 3 additions & 3 deletions pkg/sql/execstats/traceanalyzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ func TestTraceAnalyzer(t *testing.T) {
return func(map[roachpb.NodeID]*execinfrapb.FlowSpec) error { return nil }
}
return func(flows map[roachpb.NodeID]*execinfrapb.FlowSpec) error {
flowMetadata := execstats.NewFlowMetadata(flows)
analyzer := execstats.MakeTraceAnalyzer(flowMetadata)
flowsMetadata := execstats.NewFlowsMetadata(flows)
analyzer := execstats.NewTraceAnalyzer(flowsMetadata)
analyzerChan <- analyzer
return nil
}
Expand Down Expand Up @@ -193,7 +193,7 @@ func TestTraceAnalyzerProcessStats(t *testing.T) {
cumulativeKVTime = node1KVTime + node2KVTime
cumulativeContentionTime = node1ContentionTime + node2ContentionTime
)
a := &execstats.TraceAnalyzer{FlowMetadata: &execstats.FlowMetadata{}}
a := &execstats.TraceAnalyzer{FlowsMetadata: &execstats.FlowsMetadata{}}
a.AddComponentStats(
1, /* nodeID */
&execinfrapb.ComponentStats{
Expand Down
18 changes: 9 additions & 9 deletions pkg/sql/execstats/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,25 @@ func (a *TraceAnalyzer) AddComponentStats(
nodeID: nodeID,
stats: stats,
}
if a.FlowMetadata.processorStats == nil {
a.FlowMetadata.processorStats = make(map[execinfrapb.ProcessorID]*processorStats)
if a.FlowsMetadata.processorStats == nil {
a.FlowsMetadata.processorStats = make(map[execinfrapb.ProcessorID]*processorStats)
}
a.FlowMetadata.processorStats[execinfrapb.ProcessorID(stats.Component.ID)] = processorStat
a.FlowsMetadata.processorStats[execinfrapb.ProcessorID(stats.Component.ID)] = processorStat
case execinfrapb.ComponentID_STREAM:
streamStat := &streamStats{
originNodeID: nodeID,
stats: stats,
}
if a.FlowMetadata.streamStats == nil {
a.FlowMetadata.streamStats = make(map[execinfrapb.StreamID]*streamStats)
if a.FlowsMetadata.streamStats == nil {
a.FlowsMetadata.streamStats = make(map[execinfrapb.StreamID]*streamStats)
}
a.FlowMetadata.streamStats[execinfrapb.StreamID(stats.Component.ID)] = streamStat
a.FlowsMetadata.streamStats[execinfrapb.StreamID(stats.Component.ID)] = streamStat
default:
flowStat := &flowStats{}
flowStat.stats = append(flowStat.stats, stats)
if a.FlowMetadata.flowStats == nil {
a.FlowMetadata.flowStats = make(map[roachpb.NodeID]*flowStats)
if a.FlowsMetadata.flowStats == nil {
a.FlowsMetadata.flowStats = make(map[roachpb.NodeID]*flowStats)
}
a.FlowMetadata.flowStats[nodeID] = flowStat
a.FlowsMetadata.flowStats[nodeID] = flowStat
}
}
6 changes: 3 additions & 3 deletions pkg/sql/instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,11 @@ func (ih *instrumentationHelper) Finish(
// TODO(radu): this should be unified with other stmt stats accesses.
stmtStats, _ := appStats.getStatsForStmt(ih.fingerprint, ih.implicitTxn, retErr, false)
if stmtStats != nil {
var flowMetadata []*execstats.FlowMetadata
var flowsMetadata []*execstats.FlowsMetadata
for _, flowInfo := range p.curPlan.distSQLFlowInfos {
flowMetadata = append(flowMetadata, flowInfo.flowMetadata)
flowsMetadata = append(flowsMetadata, flowInfo.flowsMetadata)
}
queryLevelStats, err := execstats.GetQueryLevelStats(trace, cfg.TestingKnobs.DeterministicExplainAnalyze, flowMetadata)
queryLevelStats, err := execstats.GetQueryLevelStats(trace, cfg.TestingKnobs.DeterministicExplainAnalyze, flowsMetadata)
if err != nil {
log.VInfof(ctx, 1, "error getting query level stats for statement %s: %+v", ast, err)
} else {
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,9 @@ var _ planNodeSpooled = &spoolNode{}
type flowInfo struct {
typ planComponentType
diagram execinfrapb.FlowDiagram
// FlowMetadata stores metadata from flows that will be used by TraceAnalyzer.
flowMetadata *execstats.FlowMetadata
// flowsMetadata stores metadata from flows that will be used by
// execstats.TraceAnalyzer.
flowsMetadata *execstats.FlowsMetadata
}

// planTop is the struct that collects the properties
Expand Down

0 comments on commit cafb54d

Please sign in to comment.