diff --git a/pkg/sql/execstats/BUILD.bazel b/pkg/sql/execstats/BUILD.bazel index 758040b22009..dede9fd5d79d 100644 --- a/pkg/sql/execstats/BUILD.bazel +++ b/pkg/sql/execstats/BUILD.bazel @@ -20,9 +20,10 @@ go_test( srcs = [ "main_test.go", "traceanalyzer_test.go", + "utils_test.go", ], + embed = [":execstats"], deps = [ - ":execstats", "//pkg/base", "//pkg/roachpb", "//pkg/security", @@ -37,7 +38,10 @@ go_test( "//pkg/testutils/testcluster", "//pkg/util/leaktest", "//pkg/util/log", + "//pkg/util/optional", "//pkg/util/tracing", + "//pkg/util/uuid", + "//vendor/github.com/stretchr/testify/assert", "//vendor/github.com/stretchr/testify/require", ], ) diff --git a/pkg/sql/execstats/traceanalyzer.go b/pkg/sql/execstats/traceanalyzer.go index 6f609604f727..91639267587e 100644 --- a/pkg/sql/execstats/traceanalyzer.go +++ b/pkg/sql/execstats/traceanalyzer.go @@ -12,6 +12,7 @@ package execstats import ( "strconv" + "time" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" @@ -21,6 +22,7 @@ import ( "github.com/gogo/protobuf/types" ) +// processorStats contains stats for a specific processor extracted from a trace. type processorStats struct { nodeID roachpb.NodeID stats *execinfrapb.ComponentStats @@ -87,12 +89,18 @@ func NewFlowMetadata(flows map[roachpb.NodeID]*execinfrapb.FlowSpec) *FlowMetada type NodeLevelStats struct { NetworkBytesSentGroupedByNode map[roachpb.NodeID]int64 MaxMemoryUsageGroupedByNode map[roachpb.NodeID]int64 + KVBytesReadGroupedByNode map[roachpb.NodeID]int64 + KVRowsReadGroupedByNode map[roachpb.NodeID]int64 + KVTimeGroupedByNode map[roachpb.NodeID]time.Duration } // QueryLevelStats returns all the query level stats that correspond to the given traces and flow metadata. type QueryLevelStats struct { NetworkBytesSent int64 MaxMemUsage int64 + KVBytesRead int64 + KVRowsRead int64 + KVTime time.Duration } // TraceAnalyzer is a struct that helps calculate top-level statistics from a @@ -185,9 +193,22 @@ func (a *TraceAnalyzer) ProcessStats() error { a.nodeLevelStats = NodeLevelStats{ NetworkBytesSentGroupedByNode: make(map[roachpb.NodeID]int64), MaxMemoryUsageGroupedByNode: make(map[roachpb.NodeID]int64), + KVBytesReadGroupedByNode: make(map[roachpb.NodeID]int64), + KVRowsReadGroupedByNode: make(map[roachpb.NodeID]int64), + KVTimeGroupedByNode: make(map[roachpb.NodeID]time.Duration), } var errs error + // Process processorStats. + for _, stats := range a.processorStats { + if stats.stats == nil { + continue + } + a.nodeLevelStats.KVBytesReadGroupedByNode[stats.nodeID] += int64(stats.stats.KV.BytesRead.Value()) + a.nodeLevelStats.KVRowsReadGroupedByNode[stats.nodeID] += int64(stats.stats.KV.TuplesRead.Value()) + a.nodeLevelStats.KVTimeGroupedByNode[stats.nodeID] += stats.stats.KV.KVTime.Value() + } + // Process streamStats. for _, stats := range a.streamStats { if stats.stats == nil { @@ -238,6 +259,9 @@ func (a *TraceAnalyzer) ProcessStats() error { a.queryLevelStats = QueryLevelStats{ NetworkBytesSent: int64(0), MaxMemUsage: int64(0), + KVBytesRead: int64(0), + KVRowsRead: int64(0), + KVTime: time.Duration(0), } for _, bytesSentByNode := range a.nodeLevelStats.NetworkBytesSentGroupedByNode { @@ -249,6 +273,18 @@ func (a *TraceAnalyzer) ProcessStats() error { a.queryLevelStats.MaxMemUsage = maxMemUsage } } + + for _, kvBytesRead := range a.nodeLevelStats.KVBytesReadGroupedByNode { + a.queryLevelStats.KVBytesRead += kvBytesRead + } + + for _, kvRowsRead := range a.nodeLevelStats.KVRowsReadGroupedByNode { + a.queryLevelStats.KVRowsRead += kvRowsRead + } + + for _, kvTime := range a.nodeLevelStats.KVTimeGroupedByNode { + a.queryLevelStats.KVTime += kvTime + } return errs } diff --git a/pkg/sql/execstats/traceanalyzer_test.go b/pkg/sql/execstats/traceanalyzer_test.go index 0caf61f1b7af..35056de62a13 100644 --- a/pkg/sql/execstats/traceanalyzer_test.go +++ b/pkg/sql/execstats/traceanalyzer_test.go @@ -13,7 +13,9 @@ package execstats_test import ( "context" "fmt" + "reflect" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -27,7 +29,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/optional" "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -133,41 +138,72 @@ func TestTraceAnalyzer(t *testing.T) { } } - t.Run("NetworkBytesSent", func(t *testing.T) { - for _, analyzer := range []*execstats.TraceAnalyzer{ - rowexecTraceAnalyzer, colexecTraceAnalyzer, - } { - nodeLevelStats := analyzer.GetNodeLevelStats() - require.Equal( - t, numNodes-1, len(nodeLevelStats.NetworkBytesSentGroupedByNode), "expected all nodes minus the gateway node to have sent bytes", - ) - - queryLevelStats := analyzer.GetQueryLevelStats() - - // The stats don't count the actual bytes, but they are a synthetic value - // based on the number of tuples. In this test 21 tuples flow over the - // network. - require.Equal(t, queryLevelStats.NetworkBytesSent, int64(21*8)) - } - }) - - t.Run("MaxMemoryUsage", func(t *testing.T) { - for _, tc := range []struct { - analyzer *execstats.TraceAnalyzer - expectedMaxMemUsage int64 - }{ - { - analyzer: rowexecTraceAnalyzer, - expectedMaxMemUsage: int64(20480), + for _, tc := range []struct { + analyzer *execstats.TraceAnalyzer + expectedMaxMemUsage int64 + }{ + { + analyzer: rowexecTraceAnalyzer, + expectedMaxMemUsage: int64(20480), + }, + { + analyzer: colexecTraceAnalyzer, + expectedMaxMemUsage: int64(30720), + }, + } { + nodeLevelStats := tc.analyzer.GetNodeLevelStats() + require.Equal( + t, numNodes-1, len(nodeLevelStats.NetworkBytesSentGroupedByNode), "expected all nodes minus the gateway node to have sent bytes", + ) + + queryLevelStats := tc.analyzer.GetQueryLevelStats() + + // The stats don't count the actual bytes, but they are a synthetic value + // based on the number of tuples. In this test 21 tuples flow over the + // network. + require.Equal(t, int64(21*8), queryLevelStats.NetworkBytesSent) + + require.Equal(t, tc.expectedMaxMemUsage, queryLevelStats.MaxMemUsage) + + require.Equal(t, int64(30), queryLevelStats.KVRowsRead) + // For tests, the bytes read is based on the number of rows read, rather + // than actual bytes read. + require.Equal(t, int64(30*8), queryLevelStats.KVBytesRead) + } +} + +func TestTraceAnalyzerProcessStats(t *testing.T) { + a := &execstats.TraceAnalyzer{FlowMetadata: &execstats.FlowMetadata{}} + a.AddComponentStats( + 1, /* nodeID */ + &execinfrapb.ComponentStats{ + Component: execinfrapb.ProcessorComponentID( + execinfrapb.FlowID{UUID: uuid.MakeV4()}, + 1, /* processorID */ + ), + KV: execinfrapb.KVStats{ + KVTime: optional.MakeTimeValue(3 * time.Second), }, - { - analyzer: colexecTraceAnalyzer, - expectedMaxMemUsage: int64(30720), + }, + ) + + a.AddComponentStats( + 2, /* nodeID */ + &execinfrapb.ComponentStats{ + Component: execinfrapb.ProcessorComponentID( + execinfrapb.FlowID{UUID: uuid.MakeV4()}, + 2, /* processorID */ + ), + KV: execinfrapb.KVStats{ + KVTime: optional.MakeTimeValue(5 * time.Second), }, - } { - queryLevelStats := tc.analyzer.GetQueryLevelStats() + }, + ) - require.Equal(t, tc.expectedMaxMemUsage, queryLevelStats.MaxMemUsage) - } - }) + expected := execstats.QueryLevelStats{KVTime: 8 * time.Second} + + assert.NoError(t, a.ProcessStats()) + if got := a.GetQueryLevelStats(); !reflect.DeepEqual(got, expected) { + t.Errorf("ProcessStats() = %v, want %v", got, expected) + } } diff --git a/pkg/sql/execstats/utils_test.go b/pkg/sql/execstats/utils_test.go new file mode 100644 index 000000000000..dadfec8dfaa0 --- /dev/null +++ b/pkg/sql/execstats/utils_test.go @@ -0,0 +1,52 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package execstats + +import ( + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" +) + +// Modifies TraceAnalyzer internal state to add stats for the processor/stream/flow specified +// in stats.ComponentID and the given node ID. +func (a *TraceAnalyzer) AddComponentStats( + nodeID roachpb.NodeID, stats *execinfrapb.ComponentStats, +) { + switch stats.Component.Type { + case execinfrapb.ComponentID_PROCESSOR: + processorStat := &processorStats{ + nodeID: nodeID, + stats: stats, + } + if a.FlowMetadata.processorStats == nil { + a.FlowMetadata.processorStats = make(map[execinfrapb.ProcessorID]*processorStats) + } + a.FlowMetadata.processorStats[execinfrapb.ProcessorID(stats.Component.ID)] = processorStat + case execinfrapb.ComponentID_STREAM: + streamStat := &streamStats{ + originNodeID: nodeID, + stats: stats, + } + if a.FlowMetadata.streamStats == nil { + a.FlowMetadata.streamStats = make(map[execinfrapb.StreamID]*streamStats) + } + a.FlowMetadata.streamStats[execinfrapb.StreamID(stats.Component.ID)] = streamStat + default: + flowStat := &flowStats{ + nodeID: nodeID, + } + flowStat.stats = append(flowStat.stats, stats) + if a.FlowMetadata.flowStats == nil { + a.FlowMetadata.flowStats = make(map[execinfrapb.FlowID]*flowStats) + } + a.FlowMetadata.flowStats[stats.Component.FlowID] = flowStat + } +}