Skip to content

Commit

Permalink
Merge #57495
Browse files Browse the repository at this point in the history
57495: sql: add KV stats to TraceAnalyzer  r=asubiotto a=cathymw

The first commit adds total bytes and total rows read from KV to the TraceAnalyzer.
Closes: #56605

The second commit adds cumulative time spent in KV to the TraceAnalyzer.
Closes: #56604

Release note: None.

Co-authored-by: Cathy <[email protected]>
  • Loading branch information
craig[bot] and cathymw committed Dec 18, 2020
2 parents 05da184 + dc39c1d commit 1697431
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 35 deletions.
6 changes: 5 additions & 1 deletion pkg/sql/execstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ go_test(
srcs = [
"main_test.go",
"traceanalyzer_test.go",
"utils_test.go",
],
embed = [":execstats"],
deps = [
":execstats",
"//pkg/base",
"//pkg/roachpb",
"//pkg/security",
Expand All @@ -37,7 +38,10 @@ go_test(
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/optional",
"//pkg/util/tracing",
"//pkg/util/uuid",
"//vendor/github.com/stretchr/testify/assert",
"//vendor/github.com/stretchr/testify/require",
],
)
36 changes: 36 additions & 0 deletions pkg/sql/execstats/traceanalyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package execstats

import (
"strconv"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
Expand All @@ -21,6 +22,7 @@ import (
"github.com/gogo/protobuf/types"
)

// processorStats contains stats for a specific processor extracted from a trace.
type processorStats struct {
nodeID roachpb.NodeID
stats *execinfrapb.ComponentStats
Expand Down Expand Up @@ -87,12 +89,18 @@ func NewFlowMetadata(flows map[roachpb.NodeID]*execinfrapb.FlowSpec) *FlowMetada
type NodeLevelStats struct {
NetworkBytesSentGroupedByNode map[roachpb.NodeID]int64
MaxMemoryUsageGroupedByNode map[roachpb.NodeID]int64
KVBytesReadGroupedByNode map[roachpb.NodeID]int64
KVRowsReadGroupedByNode map[roachpb.NodeID]int64
KVTimeGroupedByNode map[roachpb.NodeID]time.Duration
}

// QueryLevelStats returns all the query level stats that correspond to the given traces and flow metadata.
type QueryLevelStats struct {
NetworkBytesSent int64
MaxMemUsage int64
KVBytesRead int64
KVRowsRead int64
KVTime time.Duration
}

// TraceAnalyzer is a struct that helps calculate top-level statistics from a
Expand Down Expand Up @@ -185,9 +193,22 @@ func (a *TraceAnalyzer) ProcessStats() error {
a.nodeLevelStats = NodeLevelStats{
NetworkBytesSentGroupedByNode: make(map[roachpb.NodeID]int64),
MaxMemoryUsageGroupedByNode: make(map[roachpb.NodeID]int64),
KVBytesReadGroupedByNode: make(map[roachpb.NodeID]int64),
KVRowsReadGroupedByNode: make(map[roachpb.NodeID]int64),
KVTimeGroupedByNode: make(map[roachpb.NodeID]time.Duration),
}
var errs error

// Process processorStats.
for _, stats := range a.processorStats {
if stats.stats == nil {
continue
}
a.nodeLevelStats.KVBytesReadGroupedByNode[stats.nodeID] += int64(stats.stats.KV.BytesRead.Value())
a.nodeLevelStats.KVRowsReadGroupedByNode[stats.nodeID] += int64(stats.stats.KV.TuplesRead.Value())
a.nodeLevelStats.KVTimeGroupedByNode[stats.nodeID] += stats.stats.KV.KVTime.Value()
}

// Process streamStats.
for _, stats := range a.streamStats {
if stats.stats == nil {
Expand Down Expand Up @@ -238,6 +259,9 @@ func (a *TraceAnalyzer) ProcessStats() error {
a.queryLevelStats = QueryLevelStats{
NetworkBytesSent: int64(0),
MaxMemUsage: int64(0),
KVBytesRead: int64(0),
KVRowsRead: int64(0),
KVTime: time.Duration(0),
}

for _, bytesSentByNode := range a.nodeLevelStats.NetworkBytesSentGroupedByNode {
Expand All @@ -249,6 +273,18 @@ func (a *TraceAnalyzer) ProcessStats() error {
a.queryLevelStats.MaxMemUsage = maxMemUsage
}
}

for _, kvBytesRead := range a.nodeLevelStats.KVBytesReadGroupedByNode {
a.queryLevelStats.KVBytesRead += kvBytesRead
}

for _, kvRowsRead := range a.nodeLevelStats.KVRowsReadGroupedByNode {
a.queryLevelStats.KVRowsRead += kvRowsRead
}

for _, kvTime := range a.nodeLevelStats.KVTimeGroupedByNode {
a.queryLevelStats.KVTime += kvTime
}
return errs
}

Expand Down
104 changes: 70 additions & 34 deletions pkg/sql/execstats/traceanalyzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ package execstats_test
import (
"context"
"fmt"
"reflect"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -27,7 +29,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/optional"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -133,41 +138,72 @@ func TestTraceAnalyzer(t *testing.T) {
}
}

t.Run("NetworkBytesSent", func(t *testing.T) {
for _, analyzer := range []*execstats.TraceAnalyzer{
rowexecTraceAnalyzer, colexecTraceAnalyzer,
} {
nodeLevelStats := analyzer.GetNodeLevelStats()
require.Equal(
t, numNodes-1, len(nodeLevelStats.NetworkBytesSentGroupedByNode), "expected all nodes minus the gateway node to have sent bytes",
)

queryLevelStats := analyzer.GetQueryLevelStats()

// The stats don't count the actual bytes, but they are a synthetic value
// based on the number of tuples. In this test 21 tuples flow over the
// network.
require.Equal(t, queryLevelStats.NetworkBytesSent, int64(21*8))
}
})

t.Run("MaxMemoryUsage", func(t *testing.T) {
for _, tc := range []struct {
analyzer *execstats.TraceAnalyzer
expectedMaxMemUsage int64
}{
{
analyzer: rowexecTraceAnalyzer,
expectedMaxMemUsage: int64(20480),
for _, tc := range []struct {
analyzer *execstats.TraceAnalyzer
expectedMaxMemUsage int64
}{
{
analyzer: rowexecTraceAnalyzer,
expectedMaxMemUsage: int64(20480),
},
{
analyzer: colexecTraceAnalyzer,
expectedMaxMemUsage: int64(30720),
},
} {
nodeLevelStats := tc.analyzer.GetNodeLevelStats()
require.Equal(
t, numNodes-1, len(nodeLevelStats.NetworkBytesSentGroupedByNode), "expected all nodes minus the gateway node to have sent bytes",
)

queryLevelStats := tc.analyzer.GetQueryLevelStats()

// The stats don't count the actual bytes, but they are a synthetic value
// based on the number of tuples. In this test 21 tuples flow over the
// network.
require.Equal(t, int64(21*8), queryLevelStats.NetworkBytesSent)

require.Equal(t, tc.expectedMaxMemUsage, queryLevelStats.MaxMemUsage)

require.Equal(t, int64(30), queryLevelStats.KVRowsRead)
// For tests, the bytes read is based on the number of rows read, rather
// than actual bytes read.
require.Equal(t, int64(30*8), queryLevelStats.KVBytesRead)
}
}

func TestTraceAnalyzerProcessStats(t *testing.T) {
a := &execstats.TraceAnalyzer{FlowMetadata: &execstats.FlowMetadata{}}
a.AddComponentStats(
1, /* nodeID */
&execinfrapb.ComponentStats{
Component: execinfrapb.ProcessorComponentID(
execinfrapb.FlowID{UUID: uuid.MakeV4()},
1, /* processorID */
),
KV: execinfrapb.KVStats{
KVTime: optional.MakeTimeValue(3 * time.Second),
},
{
analyzer: colexecTraceAnalyzer,
expectedMaxMemUsage: int64(30720),
},
)

a.AddComponentStats(
2, /* nodeID */
&execinfrapb.ComponentStats{
Component: execinfrapb.ProcessorComponentID(
execinfrapb.FlowID{UUID: uuid.MakeV4()},
2, /* processorID */
),
KV: execinfrapb.KVStats{
KVTime: optional.MakeTimeValue(5 * time.Second),
},
} {
queryLevelStats := tc.analyzer.GetQueryLevelStats()
},
)

require.Equal(t, tc.expectedMaxMemUsage, queryLevelStats.MaxMemUsage)
}
})
expected := execstats.QueryLevelStats{KVTime: 8 * time.Second}

assert.NoError(t, a.ProcessStats())
if got := a.GetQueryLevelStats(); !reflect.DeepEqual(got, expected) {
t.Errorf("ProcessStats() = %v, want %v", got, expected)
}
}
52 changes: 52 additions & 0 deletions pkg/sql/execstats/utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package execstats

import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
)

// Modifies TraceAnalyzer internal state to add stats for the processor/stream/flow specified
// in stats.ComponentID and the given node ID.
func (a *TraceAnalyzer) AddComponentStats(
nodeID roachpb.NodeID, stats *execinfrapb.ComponentStats,
) {
switch stats.Component.Type {
case execinfrapb.ComponentID_PROCESSOR:
processorStat := &processorStats{
nodeID: nodeID,
stats: stats,
}
if a.FlowMetadata.processorStats == nil {
a.FlowMetadata.processorStats = make(map[execinfrapb.ProcessorID]*processorStats)
}
a.FlowMetadata.processorStats[execinfrapb.ProcessorID(stats.Component.ID)] = processorStat
case execinfrapb.ComponentID_STREAM:
streamStat := &streamStats{
originNodeID: nodeID,
stats: stats,
}
if a.FlowMetadata.streamStats == nil {
a.FlowMetadata.streamStats = make(map[execinfrapb.StreamID]*streamStats)
}
a.FlowMetadata.streamStats[execinfrapb.StreamID(stats.Component.ID)] = streamStat
default:
flowStat := &flowStats{
nodeID: nodeID,
}
flowStat.stats = append(flowStat.stats, stats)
if a.FlowMetadata.flowStats == nil {
a.FlowMetadata.flowStats = make(map[execinfrapb.FlowID]*flowStats)
}
a.FlowMetadata.flowStats[stats.Component.FlowID] = flowStat
}
}

0 comments on commit 1697431

Please sign in to comment.