Skip to content

Commit

Permalink
sql: add rows/bytes read from KV to TraceAnalyzer
Browse files Browse the repository at this point in the history
This commit adds total bytes and total rows read from KV to the
TraceAnalyzer.

Release note: None.
  • Loading branch information
cathymw committed Dec 18, 2020
1 parent fa312ba commit 1a9af6c
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 36 deletions.
25 changes: 25 additions & 0 deletions pkg/sql/execstats/traceanalyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,16 @@ func NewFlowMetadata(flows map[roachpb.NodeID]*execinfrapb.FlowSpec) *FlowMetada
type NodeLevelStats struct {
NetworkBytesSentGroupedByNode map[roachpb.NodeID]int64
MaxMemoryUsageGroupedByNode map[roachpb.NodeID]int64
KVBytesReadGroupedByNode map[roachpb.NodeID]int64
KVRowsReadGroupedByNode map[roachpb.NodeID]int64
}

// QueryLevelStats returns all the query level stats that correspond to the given traces and flow metadata.
type QueryLevelStats struct {
NetworkBytesSent int64
MaxMemUsage int64
KVBytesRead int64
KVRowsRead int64
}

// TraceAnalyzer is a struct that helps calculate top-level statistics from a
Expand Down Expand Up @@ -185,9 +189,20 @@ func (a *TraceAnalyzer) ProcessStats() error {
a.nodeLevelStats = NodeLevelStats{
NetworkBytesSentGroupedByNode: make(map[roachpb.NodeID]int64),
MaxMemoryUsageGroupedByNode: make(map[roachpb.NodeID]int64),
KVBytesReadGroupedByNode: make(map[roachpb.NodeID]int64),
KVRowsReadGroupedByNode: make(map[roachpb.NodeID]int64),
}
var errs error

// Process processorStats.
for _, stats := range a.processorStats {
if stats.stats == nil {
continue
}
a.nodeLevelStats.KVBytesReadGroupedByNode[stats.nodeID] += int64(stats.stats.KV.BytesRead.Value())
a.nodeLevelStats.KVRowsReadGroupedByNode[stats.nodeID] += int64(stats.stats.KV.TuplesRead.Value())
}

// Process streamStats.
for _, stats := range a.streamStats {
if stats.stats == nil {
Expand Down Expand Up @@ -238,6 +253,8 @@ func (a *TraceAnalyzer) ProcessStats() error {
a.queryLevelStats = QueryLevelStats{
NetworkBytesSent: int64(0),
MaxMemUsage: int64(0),
KVBytesRead: int64(0),
KVRowsRead: int64(0),
}

for _, bytesSentByNode := range a.nodeLevelStats.NetworkBytesSentGroupedByNode {
Expand All @@ -249,6 +266,14 @@ func (a *TraceAnalyzer) ProcessStats() error {
a.queryLevelStats.MaxMemUsage = maxMemUsage
}
}

for _, kvBytesRead := range a.nodeLevelStats.KVBytesReadGroupedByNode {
a.queryLevelStats.KVBytesRead += kvBytesRead
}

for _, kvRowsRead := range a.nodeLevelStats.KVRowsReadGroupedByNode {
a.queryLevelStats.KVRowsRead += kvRowsRead
}
return errs
}

Expand Down
67 changes: 31 additions & 36 deletions pkg/sql/execstats/traceanalyzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,41 +133,36 @@ func TestTraceAnalyzer(t *testing.T) {
}
}

t.Run("NetworkBytesSent", func(t *testing.T) {
for _, analyzer := range []*execstats.TraceAnalyzer{
rowexecTraceAnalyzer, colexecTraceAnalyzer,
} {
nodeLevelStats := analyzer.GetNodeLevelStats()
require.Equal(
t, numNodes-1, len(nodeLevelStats.NetworkBytesSentGroupedByNode), "expected all nodes minus the gateway node to have sent bytes",
)

queryLevelStats := analyzer.GetQueryLevelStats()

// The stats don't count the actual bytes, but they are a synthetic value
// based on the number of tuples. In this test 21 tuples flow over the
// network.
require.Equal(t, queryLevelStats.NetworkBytesSent, int64(21*8))
}
})

t.Run("MaxMemoryUsage", func(t *testing.T) {
for _, tc := range []struct {
analyzer *execstats.TraceAnalyzer
expectedMaxMemUsage int64
}{
{
analyzer: rowexecTraceAnalyzer,
expectedMaxMemUsage: int64(20480),
},
{
analyzer: colexecTraceAnalyzer,
expectedMaxMemUsage: int64(30720),
},
} {
queryLevelStats := tc.analyzer.GetQueryLevelStats()
for _, tc := range []struct {
analyzer *execstats.TraceAnalyzer
expectedMaxMemUsage int64
}{
{
analyzer: rowexecTraceAnalyzer,
expectedMaxMemUsage: int64(20480),
},
{
analyzer: colexecTraceAnalyzer,
expectedMaxMemUsage: int64(30720),
},
} {
nodeLevelStats := tc.analyzer.GetNodeLevelStats()
require.Equal(
t, numNodes-1, len(nodeLevelStats.NetworkBytesSentGroupedByNode), "expected all nodes minus the gateway node to have sent bytes",
)

require.Equal(t, tc.expectedMaxMemUsage, queryLevelStats.MaxMemUsage)
}
})
queryLevelStats := tc.analyzer.GetQueryLevelStats()

// The stats don't count the actual bytes, but they are a synthetic value
// based on the number of tuples. In this test 21 tuples flow over the
// network.
require.Equal(t, int64(21*8), queryLevelStats.NetworkBytesSent)

require.Equal(t, tc.expectedMaxMemUsage, queryLevelStats.MaxMemUsage)

require.Equal(t, int64(30), queryLevelStats.KVRowsRead)
// For tests, the bytes read is based on the number of rows read, rather
// than actual bytes read.
require.Equal(t, int64(30*8), queryLevelStats.KVBytesRead)
}
}

0 comments on commit 1a9af6c

Please sign in to comment.