From 1a9af6ca8e3ea04d2edc5efe8e5530d5847cdc0b Mon Sep 17 00:00:00 2001 From: Cathy Date: Thu, 3 Dec 2020 13:30:01 -0500 Subject: [PATCH 1/2] sql: add rows/bytes read from KV to TraceAnalyzer This commit adds total bytes and total rows read from KV to the TraceAnalyzer. Release note: None. --- pkg/sql/execstats/traceanalyzer.go | 25 +++++++++ pkg/sql/execstats/traceanalyzer_test.go | 67 ++++++++++++------------- 2 files changed, 56 insertions(+), 36 deletions(-) diff --git a/pkg/sql/execstats/traceanalyzer.go b/pkg/sql/execstats/traceanalyzer.go index 6f609604f727..3748e7d3122a 100644 --- a/pkg/sql/execstats/traceanalyzer.go +++ b/pkg/sql/execstats/traceanalyzer.go @@ -87,12 +87,16 @@ func NewFlowMetadata(flows map[roachpb.NodeID]*execinfrapb.FlowSpec) *FlowMetada type NodeLevelStats struct { NetworkBytesSentGroupedByNode map[roachpb.NodeID]int64 MaxMemoryUsageGroupedByNode map[roachpb.NodeID]int64 + KVBytesReadGroupedByNode map[roachpb.NodeID]int64 + KVRowsReadGroupedByNode map[roachpb.NodeID]int64 } // QueryLevelStats returns all the query level stats that correspond to the given traces and flow metadata. type QueryLevelStats struct { NetworkBytesSent int64 MaxMemUsage int64 + KVBytesRead int64 + KVRowsRead int64 } // TraceAnalyzer is a struct that helps calculate top-level statistics from a @@ -185,9 +189,20 @@ func (a *TraceAnalyzer) ProcessStats() error { a.nodeLevelStats = NodeLevelStats{ NetworkBytesSentGroupedByNode: make(map[roachpb.NodeID]int64), MaxMemoryUsageGroupedByNode: make(map[roachpb.NodeID]int64), + KVBytesReadGroupedByNode: make(map[roachpb.NodeID]int64), + KVRowsReadGroupedByNode: make(map[roachpb.NodeID]int64), } var errs error + // Process processorStats. + for _, stats := range a.processorStats { + if stats.stats == nil { + continue + } + a.nodeLevelStats.KVBytesReadGroupedByNode[stats.nodeID] += int64(stats.stats.KV.BytesRead.Value()) + a.nodeLevelStats.KVRowsReadGroupedByNode[stats.nodeID] += int64(stats.stats.KV.TuplesRead.Value()) + } + // Process streamStats. for _, stats := range a.streamStats { if stats.stats == nil { @@ -238,6 +253,8 @@ func (a *TraceAnalyzer) ProcessStats() error { a.queryLevelStats = QueryLevelStats{ NetworkBytesSent: int64(0), MaxMemUsage: int64(0), + KVBytesRead: int64(0), + KVRowsRead: int64(0), } for _, bytesSentByNode := range a.nodeLevelStats.NetworkBytesSentGroupedByNode { @@ -249,6 +266,14 @@ func (a *TraceAnalyzer) ProcessStats() error { a.queryLevelStats.MaxMemUsage = maxMemUsage } } + + for _, kvBytesRead := range a.nodeLevelStats.KVBytesReadGroupedByNode { + a.queryLevelStats.KVBytesRead += kvBytesRead + } + + for _, kvRowsRead := range a.nodeLevelStats.KVRowsReadGroupedByNode { + a.queryLevelStats.KVRowsRead += kvRowsRead + } return errs } diff --git a/pkg/sql/execstats/traceanalyzer_test.go b/pkg/sql/execstats/traceanalyzer_test.go index 0caf61f1b7af..cd732c2ffd01 100644 --- a/pkg/sql/execstats/traceanalyzer_test.go +++ b/pkg/sql/execstats/traceanalyzer_test.go @@ -133,41 +133,36 @@ func TestTraceAnalyzer(t *testing.T) { } } - t.Run("NetworkBytesSent", func(t *testing.T) { - for _, analyzer := range []*execstats.TraceAnalyzer{ - rowexecTraceAnalyzer, colexecTraceAnalyzer, - } { - nodeLevelStats := analyzer.GetNodeLevelStats() - require.Equal( - t, numNodes-1, len(nodeLevelStats.NetworkBytesSentGroupedByNode), "expected all nodes minus the gateway node to have sent bytes", - ) - - queryLevelStats := analyzer.GetQueryLevelStats() - - // The stats don't count the actual bytes, but they are a synthetic value - // based on the number of tuples. In this test 21 tuples flow over the - // network. - require.Equal(t, queryLevelStats.NetworkBytesSent, int64(21*8)) - } - }) - - t.Run("MaxMemoryUsage", func(t *testing.T) { - for _, tc := range []struct { - analyzer *execstats.TraceAnalyzer - expectedMaxMemUsage int64 - }{ - { - analyzer: rowexecTraceAnalyzer, - expectedMaxMemUsage: int64(20480), - }, - { - analyzer: colexecTraceAnalyzer, - expectedMaxMemUsage: int64(30720), - }, - } { - queryLevelStats := tc.analyzer.GetQueryLevelStats() + for _, tc := range []struct { + analyzer *execstats.TraceAnalyzer + expectedMaxMemUsage int64 + }{ + { + analyzer: rowexecTraceAnalyzer, + expectedMaxMemUsage: int64(20480), + }, + { + analyzer: colexecTraceAnalyzer, + expectedMaxMemUsage: int64(30720), + }, + } { + nodeLevelStats := tc.analyzer.GetNodeLevelStats() + require.Equal( + t, numNodes-1, len(nodeLevelStats.NetworkBytesSentGroupedByNode), "expected all nodes minus the gateway node to have sent bytes", + ) - require.Equal(t, tc.expectedMaxMemUsage, queryLevelStats.MaxMemUsage) - } - }) + queryLevelStats := tc.analyzer.GetQueryLevelStats() + + // The stats don't count the actual bytes, but they are a synthetic value + // based on the number of tuples. In this test 21 tuples flow over the + // network. + require.Equal(t, int64(21*8), queryLevelStats.NetworkBytesSent) + + require.Equal(t, tc.expectedMaxMemUsage, queryLevelStats.MaxMemUsage) + + require.Equal(t, int64(30), queryLevelStats.KVRowsRead) + // For tests, the bytes read is based on the number of rows read, rather + // than actual bytes read. + require.Equal(t, int64(30*8), queryLevelStats.KVBytesRead) + } } From dc39c1d216bb69964244d27530dd183b741b2539 Mon Sep 17 00:00:00 2001 From: Cathy Date: Thu, 3 Dec 2020 16:09:17 -0500 Subject: [PATCH 2/2] sql: add cumulative KV time to TraceAnalyzer This commit adds the total time spent waiting for KV requests to TraceAnalyzer. Release note: None. --- pkg/sql/execstats/BUILD.bazel | 6 ++- pkg/sql/execstats/traceanalyzer.go | 11 ++++++ pkg/sql/execstats/traceanalyzer_test.go | 41 +++++++++++++++++++ pkg/sql/execstats/utils_test.go | 52 +++++++++++++++++++++++++ 4 files changed, 109 insertions(+), 1 deletion(-) create mode 100644 pkg/sql/execstats/utils_test.go diff --git a/pkg/sql/execstats/BUILD.bazel b/pkg/sql/execstats/BUILD.bazel index 758040b22009..dede9fd5d79d 100644 --- a/pkg/sql/execstats/BUILD.bazel +++ b/pkg/sql/execstats/BUILD.bazel @@ -20,9 +20,10 @@ go_test( srcs = [ "main_test.go", "traceanalyzer_test.go", + "utils_test.go", ], + embed = [":execstats"], deps = [ - ":execstats", "//pkg/base", "//pkg/roachpb", "//pkg/security", @@ -37,7 +38,10 @@ go_test( "//pkg/testutils/testcluster", "//pkg/util/leaktest", "//pkg/util/log", + "//pkg/util/optional", "//pkg/util/tracing", + "//pkg/util/uuid", + "//vendor/github.com/stretchr/testify/assert", "//vendor/github.com/stretchr/testify/require", ], ) diff --git a/pkg/sql/execstats/traceanalyzer.go b/pkg/sql/execstats/traceanalyzer.go index 3748e7d3122a..91639267587e 100644 --- a/pkg/sql/execstats/traceanalyzer.go +++ b/pkg/sql/execstats/traceanalyzer.go @@ -12,6 +12,7 @@ package execstats import ( "strconv" + "time" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" @@ -21,6 +22,7 @@ import ( "github.com/gogo/protobuf/types" ) +// processorStats contains stats for a specific processor extracted from a trace. type processorStats struct { nodeID roachpb.NodeID stats *execinfrapb.ComponentStats @@ -89,6 +91,7 @@ type NodeLevelStats struct { MaxMemoryUsageGroupedByNode map[roachpb.NodeID]int64 KVBytesReadGroupedByNode map[roachpb.NodeID]int64 KVRowsReadGroupedByNode map[roachpb.NodeID]int64 + KVTimeGroupedByNode map[roachpb.NodeID]time.Duration } // QueryLevelStats returns all the query level stats that correspond to the given traces and flow metadata. @@ -97,6 +100,7 @@ type QueryLevelStats struct { MaxMemUsage int64 KVBytesRead int64 KVRowsRead int64 + KVTime time.Duration } // TraceAnalyzer is a struct that helps calculate top-level statistics from a @@ -191,6 +195,7 @@ func (a *TraceAnalyzer) ProcessStats() error { MaxMemoryUsageGroupedByNode: make(map[roachpb.NodeID]int64), KVBytesReadGroupedByNode: make(map[roachpb.NodeID]int64), KVRowsReadGroupedByNode: make(map[roachpb.NodeID]int64), + KVTimeGroupedByNode: make(map[roachpb.NodeID]time.Duration), } var errs error @@ -201,6 +206,7 @@ func (a *TraceAnalyzer) ProcessStats() error { } a.nodeLevelStats.KVBytesReadGroupedByNode[stats.nodeID] += int64(stats.stats.KV.BytesRead.Value()) a.nodeLevelStats.KVRowsReadGroupedByNode[stats.nodeID] += int64(stats.stats.KV.TuplesRead.Value()) + a.nodeLevelStats.KVTimeGroupedByNode[stats.nodeID] += stats.stats.KV.KVTime.Value() } // Process streamStats. @@ -255,6 +261,7 @@ func (a *TraceAnalyzer) ProcessStats() error { MaxMemUsage: int64(0), KVBytesRead: int64(0), KVRowsRead: int64(0), + KVTime: time.Duration(0), } for _, bytesSentByNode := range a.nodeLevelStats.NetworkBytesSentGroupedByNode { @@ -274,6 +281,10 @@ func (a *TraceAnalyzer) ProcessStats() error { for _, kvRowsRead := range a.nodeLevelStats.KVRowsReadGroupedByNode { a.queryLevelStats.KVRowsRead += kvRowsRead } + + for _, kvTime := range a.nodeLevelStats.KVTimeGroupedByNode { + a.queryLevelStats.KVTime += kvTime + } return errs } diff --git a/pkg/sql/execstats/traceanalyzer_test.go b/pkg/sql/execstats/traceanalyzer_test.go index cd732c2ffd01..35056de62a13 100644 --- a/pkg/sql/execstats/traceanalyzer_test.go +++ b/pkg/sql/execstats/traceanalyzer_test.go @@ -13,7 +13,9 @@ package execstats_test import ( "context" "fmt" + "reflect" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -27,7 +29,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/optional" "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -166,3 +171,39 @@ func TestTraceAnalyzer(t *testing.T) { require.Equal(t, int64(30*8), queryLevelStats.KVBytesRead) } } + +func TestTraceAnalyzerProcessStats(t *testing.T) { + a := &execstats.TraceAnalyzer{FlowMetadata: &execstats.FlowMetadata{}} + a.AddComponentStats( + 1, /* nodeID */ + &execinfrapb.ComponentStats{ + Component: execinfrapb.ProcessorComponentID( + execinfrapb.FlowID{UUID: uuid.MakeV4()}, + 1, /* processorID */ + ), + KV: execinfrapb.KVStats{ + KVTime: optional.MakeTimeValue(3 * time.Second), + }, + }, + ) + + a.AddComponentStats( + 2, /* nodeID */ + &execinfrapb.ComponentStats{ + Component: execinfrapb.ProcessorComponentID( + execinfrapb.FlowID{UUID: uuid.MakeV4()}, + 2, /* processorID */ + ), + KV: execinfrapb.KVStats{ + KVTime: optional.MakeTimeValue(5 * time.Second), + }, + }, + ) + + expected := execstats.QueryLevelStats{KVTime: 8 * time.Second} + + assert.NoError(t, a.ProcessStats()) + if got := a.GetQueryLevelStats(); !reflect.DeepEqual(got, expected) { + t.Errorf("ProcessStats() = %v, want %v", got, expected) + } +} diff --git a/pkg/sql/execstats/utils_test.go b/pkg/sql/execstats/utils_test.go new file mode 100644 index 000000000000..dadfec8dfaa0 --- /dev/null +++ b/pkg/sql/execstats/utils_test.go @@ -0,0 +1,52 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package execstats + +import ( + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" +) + +// Modifies TraceAnalyzer internal state to add stats for the processor/stream/flow specified +// in stats.ComponentID and the given node ID. +func (a *TraceAnalyzer) AddComponentStats( + nodeID roachpb.NodeID, stats *execinfrapb.ComponentStats, +) { + switch stats.Component.Type { + case execinfrapb.ComponentID_PROCESSOR: + processorStat := &processorStats{ + nodeID: nodeID, + stats: stats, + } + if a.FlowMetadata.processorStats == nil { + a.FlowMetadata.processorStats = make(map[execinfrapb.ProcessorID]*processorStats) + } + a.FlowMetadata.processorStats[execinfrapb.ProcessorID(stats.Component.ID)] = processorStat + case execinfrapb.ComponentID_STREAM: + streamStat := &streamStats{ + originNodeID: nodeID, + stats: stats, + } + if a.FlowMetadata.streamStats == nil { + a.FlowMetadata.streamStats = make(map[execinfrapb.StreamID]*streamStats) + } + a.FlowMetadata.streamStats[execinfrapb.StreamID(stats.Component.ID)] = streamStat + default: + flowStat := &flowStats{ + nodeID: nodeID, + } + flowStat.stats = append(flowStat.stats, stats) + if a.FlowMetadata.flowStats == nil { + a.FlowMetadata.flowStats = make(map[execinfrapb.FlowID]*flowStats) + } + a.FlowMetadata.flowStats[stats.Component.FlowID] = flowStat + } +}