Skip to content

Commit

Permalink
execstats: ensure that all nodes emit maximum memory usage
Browse files Browse the repository at this point in the history
This commit adds a line to verify that the maximum memory usage grouped by
node has as many entries as we have nodes. This would have uncovered the bug
where the gateway node was not emitting stats if no outboxes were present on
it.

Additionally, flowStats now have nodeIDs rather than flowIDs, since the
assumption that flowIDs uniquely describe a flow is incorrect but nodeIDs do
so.

Release note: None
  • Loading branch information
asubiotto committed Jan 25, 2021
1 parent 5fd21c2 commit cb7ee1c
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 67 deletions.
1 change: 0 additions & 1 deletion pkg/sql/execstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ go_library(
"//pkg/roachpb",
"//pkg/sql/execinfrapb",
"//pkg/util/tracing/tracingpb",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
],
)
Expand Down
33 changes: 17 additions & 16 deletions pkg/sql/execstats/traceanalyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)

Expand All @@ -33,8 +32,7 @@ type streamStats struct {
}

type flowStats struct {
nodeID roachpb.NodeID
stats []*execinfrapb.ComponentStats
stats []*execinfrapb.ComponentStats
}

// FlowMetadata contains metadata extracted from flows. This information is stored
Expand All @@ -50,21 +48,22 @@ type FlowMetadata struct {
// streamStats to have nil stats, which indicates that no stats were found
// for the given stream in the trace.
streamStats map[execinfrapb.StreamID]*streamStats
// flowStats maps a flow ID to flow level stats extracted from a trace.
flowStats map[execinfrapb.FlowID]*flowStats
// flowStats maps a node ID to flow level stats extracted from a trace. Note
// that the key is not a FlowID because the same FlowID is used across nodes.
flowStats map[roachpb.NodeID]*flowStats
}

// NewFlowMetadata creates a FlowMetadata with the given physical plan information.
func NewFlowMetadata(flows map[roachpb.NodeID]*execinfrapb.FlowSpec) *FlowMetadata {
a := &FlowMetadata{
processorStats: make(map[execinfrapb.ProcessorID]*processorStats),
streamStats: make(map[execinfrapb.StreamID]*streamStats),
flowStats: make(map[execinfrapb.FlowID]*flowStats),
flowStats: make(map[roachpb.NodeID]*flowStats),
}

// Annotate the maps with physical plan information.
for nodeID, flow := range flows {
a.flowStats[flow.FlowID] = &flowStats{nodeID: nodeID}
a.flowStats[nodeID] = &flowStats{}
for _, proc := range flow.Processors {
a.processorStats[execinfrapb.ProcessorID(proc.ProcessorID)] = &processorStats{nodeID: nodeID}
for _, output := range proc.Output {
Expand Down Expand Up @@ -151,13 +150,15 @@ func (a *TraceAnalyzer) AddTrace(trace []tracingpb.RecordedSpan, makeDeterminist
streamStats.stats = componentStats

case execinfrapb.ComponentID_FLOW:
if id := component.FlowID.UUID; id != (uuid.UUID{}) {
flowStats := a.flowStats[execinfrapb.FlowID{UUID: id}]
if flowStats == nil {
return errors.Errorf("trace has span for flow %s but the flow does not exist in the physical plan", id)
}
flowStats.stats = append(flowStats.stats, componentStats)
flowStats := a.flowStats[component.NodeID]
if flowStats == nil {
return errors.Errorf(
"trace has span for flow %s on node %s but the flow does not exist in the physical plan",
component.FlowID,
component.NodeID,
)
}
flowStats.stats = append(flowStats.stats, componentStats)
}
}

Expand Down Expand Up @@ -225,7 +226,7 @@ func (a *TraceAnalyzer) ProcessStats() error {
}

// Process flowStats.
for _, stats := range a.flowStats {
for nodeID, stats := range a.flowStats {
if stats.stats == nil {
continue
}
Expand All @@ -235,8 +236,8 @@ func (a *TraceAnalyzer) ProcessStats() error {
// flow stats for max memory usage.
for _, v := range stats.stats {
if v.FlowStats.MaxMemUsage.HasValue() {
if memUsage := int64(v.FlowStats.MaxMemUsage.Value()); memUsage > a.nodeLevelStats.MaxMemoryUsageGroupedByNode[stats.nodeID] {
a.nodeLevelStats.MaxMemoryUsageGroupedByNode[stats.nodeID] = memUsage
if memUsage := int64(v.FlowStats.MaxMemUsage.Value()); memUsage > a.nodeLevelStats.MaxMemoryUsageGroupedByNode[nodeID] {
a.nodeLevelStats.MaxMemoryUsageGroupedByNode[nodeID] = memUsage
}
}
}
Expand Down
54 changes: 31 additions & 23 deletions pkg/sql/execstats/traceanalyzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,40 +139,48 @@ func TestTraceAnalyzer(t *testing.T) {
}

for _, tc := range []struct {
name string
analyzer *execstats.TraceAnalyzer
expectedMaxMemUsage int64
}{
{
name: "RowExec",
analyzer: rowexecTraceAnalyzer,
expectedMaxMemUsage: int64(20480),
},
{
name: "ColExec",
analyzer: colexecTraceAnalyzer,
expectedMaxMemUsage: int64(30720),
expectedMaxMemUsage: int64(51200),
},
} {
nodeLevelStats := tc.analyzer.GetNodeLevelStats()
require.Equal(
t, numNodes-1, len(nodeLevelStats.NetworkBytesSentGroupedByNode), "expected all nodes minus the gateway node to have sent bytes",
)

queryLevelStats := tc.analyzer.GetQueryLevelStats()

// The stats don't count the actual bytes, but they are a synthetic value
// based on the number of tuples. In this test 21 tuples flow over the
// network.
require.Equal(t, int64(21*8), queryLevelStats.NetworkBytesSent)

require.Equal(t, tc.expectedMaxMemUsage, queryLevelStats.MaxMemUsage)

require.Equal(t, int64(30), queryLevelStats.KVRowsRead)
// For tests, the bytes read is based on the number of rows read, rather
// than actual bytes read.
require.Equal(t, int64(30*8), queryLevelStats.KVBytesRead)

// For tests, network messages is a synthetic value based on the number of
// network tuples. In this test 21 tuples flow over the network.
require.Equal(t, int64(21/2), queryLevelStats.NetworkMessages)
t.Run(tc.name, func(t *testing.T) {
nodeLevelStats := tc.analyzer.GetNodeLevelStats()
require.Equal(
t, numNodes-1, len(nodeLevelStats.NetworkBytesSentGroupedByNode), "expected all nodes minus the gateway node to have sent bytes",
)
require.Equal(
t, numNodes, len(nodeLevelStats.MaxMemoryUsageGroupedByNode), "expected all nodes to have specified maximum memory usage",
)

queryLevelStats := tc.analyzer.GetQueryLevelStats()

// The stats don't count the actual bytes, but they are a synthetic value
// based on the number of tuples. In this test 21 tuples flow over the
// network.
require.Equal(t, int64(21*8), queryLevelStats.NetworkBytesSent)

require.Equal(t, tc.expectedMaxMemUsage, queryLevelStats.MaxMemUsage)

require.Equal(t, int64(30), queryLevelStats.KVRowsRead)
// For tests, the bytes read is based on the number of rows read, rather
// than actual bytes read.
require.Equal(t, int64(30*8), queryLevelStats.KVBytesRead)

// For tests, network messages is a synthetic value based on the number of
// network tuples. In this test 21 tuples flow over the network.
require.Equal(t, int64(21/2), queryLevelStats.NetworkMessages)
})
}
}

Expand Down
12 changes: 5 additions & 7 deletions pkg/sql/execstats/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
)

// Modifies TraceAnalyzer internal state to add stats for the processor/stream/flow specified
// in stats.ComponentID and the given node ID.
// AddComponentStats modifies TraceAnalyzer internal state to add stats for the
// processor/stream/flow specified in stats.ComponentID and the given node ID.
func (a *TraceAnalyzer) AddComponentStats(
nodeID roachpb.NodeID, stats *execinfrapb.ComponentStats,
) {
Expand All @@ -40,13 +40,11 @@ func (a *TraceAnalyzer) AddComponentStats(
}
a.FlowMetadata.streamStats[execinfrapb.StreamID(stats.Component.ID)] = streamStat
default:
flowStat := &flowStats{
nodeID: nodeID,
}
flowStat := &flowStats{}
flowStat.stats = append(flowStat.stats, stats)
if a.FlowMetadata.flowStats == nil {
a.FlowMetadata.flowStats = make(map[execinfrapb.FlowID]*flowStats)
a.FlowMetadata.flowStats = make(map[roachpb.NodeID]*flowStats)
}
a.FlowMetadata.flowStats[stats.Component.FlowID] = flowStat
a.FlowMetadata.flowStats[nodeID] = flowStat
}
}
42 changes: 22 additions & 20 deletions pkg/sql/opt/exec/execbuilder/testdata/select
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,31 @@ SET vectorize=off; SET tracing = on; BEGIN; SELECT 1; COMMIT; SELECT 2; SET trac
# how many commands we ran in the session.
query ITT
SELECT
span, regexp_replace(message, 'pos:[0-9]*', 'pos:?'), operation
span, regexp_replace(regexp_replace(message, 'pos:[0-9]*', 'pos:?'), 'flowid: [0-9A-Fa-f-]*', 'flowid: ?'), operation
FROM [SHOW TRACE FOR SESSION]
WHERE message LIKE '%SPAN START%' OR message LIKE '%pos%executing%';
----
0 === SPAN START: session recording === session recording
1 === SPAN START: exec stmt === exec stmt
1 [NoTxn pos:?] executing ExecStmt: BEGIN TRANSACTION exec stmt
2 === SPAN START: sql txn === sql txn
3 === SPAN START: exec stmt === exec stmt
3 [Open pos:?] executing ExecStmt: SELECT 1 exec stmt
4 === SPAN START: consuming rows === consuming rows
5 === SPAN START: flow === flow
6 === SPAN START: exec stmt === exec stmt
6 [Open pos:?] executing ExecStmt: COMMIT TRANSACTION exec stmt
7 === SPAN START: exec stmt === exec stmt
7 [NoTxn pos:?] executing ExecStmt: SELECT 2 exec stmt
8 === SPAN START: sql txn === sql txn
9 === SPAN START: exec stmt === exec stmt
9 [Open pos:?] executing ExecStmt: SELECT 2 exec stmt
10 === SPAN START: consuming rows === consuming rows
11 === SPAN START: flow === flow
12 === SPAN START: exec stmt === exec stmt
12 [NoTxn pos:?] executing ExecStmt: SET TRACING = off exec stmt
0 === SPAN START: session recording === session recording
1 === SPAN START: exec stmt === exec stmt
1 [NoTxn pos:?] executing ExecStmt: BEGIN TRANSACTION exec stmt
2 === SPAN START: sql txn === sql txn
3 === SPAN START: exec stmt === exec stmt
3 [Open pos:?] executing ExecStmt: SELECT 1 exec stmt
4 === SPAN START: consuming rows === consuming rows
5 === SPAN START: flow ===
cockroach.flowid: ? flow
6 === SPAN START: exec stmt === exec stmt
6 [Open pos:?] executing ExecStmt: COMMIT TRANSACTION exec stmt
7 === SPAN START: exec stmt === exec stmt
7 [NoTxn pos:?] executing ExecStmt: SELECT 2 exec stmt
8 === SPAN START: sql txn === sql txn
9 === SPAN START: exec stmt === exec stmt
9 [Open pos:?] executing ExecStmt: SELECT 2 exec stmt
10 === SPAN START: consuming rows === consuming rows
11 === SPAN START: flow ===
cockroach.flowid: ? flow
12 === SPAN START: exec stmt === exec stmt
12 [NoTxn pos:?] executing ExecStmt: SET TRACING = off exec stmt

# ------------------------------------------------------------------------------
# Numeric References Tests.
Expand Down

0 comments on commit cb7ee1c

Please sign in to comment.