Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql/ui: surface network bytes sent in statements page #55969

Merged
merged 3 commits into from
Nov 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/roachpb/app_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func (s *StatementStatistics) Add(other *StatementStatistics) {
s.OverheadLat.Add(other.OverheadLat, s.Count, other.Count)
s.BytesRead.Add(other.BytesRead, s.Count, other.Count)
s.RowsRead.Add(other.RowsRead, s.Count, other.Count)
s.BytesSentOverNetwork.Add(other.BytesSentOverNetwork, s.Count, other.Count)

if other.SensitiveInfo.LastErr != "" {
s.SensitiveInfo.LastErr = other.SensitiveInfo.LastErr
Expand All @@ -155,5 +156,6 @@ func (s *StatementStatistics) AlmostEqual(other *StatementStatistics, eps float6
s.OverheadLat.AlmostEqual(other.OverheadLat, eps) &&
s.SensitiveInfo.Equal(other.SensitiveInfo) &&
s.BytesRead.AlmostEqual(other.BytesRead, eps) &&
s.RowsRead.AlmostEqual(other.RowsRead, eps)
s.RowsRead.AlmostEqual(other.RowsRead, eps) &&
s.BytesSentOverNetwork.AlmostEqual(other.BytesSentOverNetwork, eps)
}
254 changes: 150 additions & 104 deletions pkg/roachpb/app_stats.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions pkg/roachpb/app_stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ message StatementStatistics {
// RowsRead collects the number of rows read from disk.
optional NumericStat rows_read = 16 [(gogoproto.nullable) = false];

// BytesSentOverNetwork collects the number of bytes sent over the network.
optional NumericStat bytes_sent_over_network = 17 [(gogoproto.nullable) = false];

// Note: be sure to update `sql/app_stats.go` when adding/removing fields here!
}

Expand Down
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ go_library(
"//pkg/sql/enum",
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/execstats",
"//pkg/sql/faketreeeval",
"//pkg/sql/flowinfra",
"//pkg/sql/gcjob/gcjobnotifier",
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/app_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,11 @@ func (a *appStats) recordStatement(
s.mu.data.OverheadLat.Record(s.mu.data.Count, ovhLat)
s.mu.data.BytesRead.Record(s.mu.data.Count, float64(stats.bytesRead))
s.mu.data.RowsRead.Record(s.mu.data.Count, float64(stats.rowsRead))
// Note that some fields derived from tracing statements (such as
// BytesSentOverNetwork) are not updated here because they are collected
// on-demand.
// TODO(asubiotto): Record the aforementioned fields here when always-on
// tracing is a thing.
s.mu.vectorized = vectorized
s.mu.distSQLUsed = distSQLUsed
s.mu.Unlock()
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/colexec/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ func (vsc *VectorizedStatsCollector) OutputStats(
vsc.MaxAllocatedMem = 0
vsc.MaxAllocatedDisk = 0
vsc.NumBatches = 0
vsc.BytesRead = 0
// BytesRead is overridden to a useful value for tests.
vsc.BytesRead = 8 * vsc.NumTuples
}
span.SetSpanStats(&vsc.VectorizedStats)
span.Finish()
Expand Down
40 changes: 35 additions & 5 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/paramparse"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
Expand Down Expand Up @@ -380,6 +379,37 @@ func (ex *connExecutor) execStmtInOpenState(
)
}
}

stmtStats, _ := ex.appStats.getStatsForStmt(&stmt, ex.implicitTxn(), retErr, false)
if stmtStats == nil {
return
}

networkBytesSent := int64(0)
for _, flowInfo := range p.curPlan.distSQLFlowInfos {
analyzer := flowInfo.analyzer
if err := analyzer.AddTrace(trace); err != nil {
log.VInfof(ctx, 1, "error analyzing trace statistics for stmt %s: %v", stmt, err)
continue
}

networkBytesSentGroupedByNode, err := analyzer.GetNetworkBytesSent()
if err != nil {
log.VInfof(ctx, 1, "error calculating network bytes sent for stmt %s: %v", stmt, err)
continue
}
for _, bytesSentByNode := range networkBytesSentGroupedByNode {
networkBytesSent += bytesSentByNode
}
}

stmtStats.mu.Lock()
// Record trace-related statistics. A count of 1 is passed given that this
// statistic is only recorded when statement diagnostics are enabled.
// TODO(asubiotto): NumericStat properties will be properly calculated
// once this statistic is always collected.
stmtStats.mu.data.BytesSentOverNetwork.Record(1 /* count */, float64(networkBytesSent))
stmtStats.mu.Unlock()
}()
}

Expand Down Expand Up @@ -962,10 +992,10 @@ func (ex *connExecutor) execWithDistSQLEngine(
evalCtx := planner.ExtendedEvalContext()
planCtx := ex.server.cfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, planner, planner.txn, distribute)
planCtx.stmtType = recv.stmtType
if planner.collectBundle {
planCtx.saveDiagram = func(diagram execinfrapb.FlowDiagram) {
planner.curPlan.distSQLDiagrams = append(planner.curPlan.distSQLDiagrams, diagram)
}
if ex.server.cfg.TestingKnobs.TestingSaveFlows != nil {
planCtx.saveFlows = ex.server.cfg.TestingKnobs.TestingSaveFlows(planner.stmt.SQL)
} else if planner.collectBundle {
planCtx.saveFlows = planCtx.getDefaultSaveFlowsFunc(ctx, planner, planComponentTypeMainQuery)
}

var evalCtxFactory func() *extendedEvalContext
Expand Down
56 changes: 51 additions & 5 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/distsql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/execstats"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
Expand Down Expand Up @@ -605,11 +606,10 @@ type PlanningCtx struct {
// be replaced by evaluation. Should only be set by EXPLAIN.
noEvalSubqueries bool

// If set, a diagram for the plan will be generated and passed to this
// function.
saveDiagram func(execinfrapb.FlowDiagram)
// If set, the diagram passed to saveDiagram will show the types of each
// stream.
// If set, the flows for the physical plan will be passed to this function.
// The flows are not safe for use past the lifetime of the saveFlows function.
saveFlows func(map[roachpb.NodeID]*execinfrapb.FlowSpec) error
// If set, the result of flowSpecsToDiagram will show the types of each stream.
saveDiagramShowInputTypes bool
}

Expand Down Expand Up @@ -637,6 +637,52 @@ func (p *PlanningCtx) EvaluateSubqueries() bool {
return !p.noEvalSubqueries
}

// getDefaultSaveFlowsFunc returns the default function used to save physical
// plans and their diagrams.
func (p *PlanningCtx) getDefaultSaveFlowsFunc(
ctx context.Context, planner *planner, typ planComponentType,
) func(map[roachpb.NodeID]*execinfrapb.FlowSpec) error {
return func(flows map[roachpb.NodeID]*execinfrapb.FlowSpec) error {
diagram, err := p.flowSpecsToDiagram(ctx, flows)
if err != nil {
return err
}
planner.curPlan.distSQLFlowInfos = append(
planner.curPlan.distSQLFlowInfos, flowInfo{typ: typ, diagram: diagram, analyzer: execstats.NewTraceAnalyzer(flows)},
)
return nil
}
}

// flowSpecsToDiagram is a helper function used to convert flowSpecs into a
// FlowDiagram using this PlanningCtx's information.
func (p *PlanningCtx) flowSpecsToDiagram(
ctx context.Context, flows map[roachpb.NodeID]*execinfrapb.FlowSpec,
) (execinfrapb.FlowDiagram, error) {
// Local flows might not have the UUID field set. We need it to be set to
// distinguish statistics for processors in subqueries vs the main query vs
// postqueries.
if len(flows) == 1 {
for _, f := range flows {
if f.FlowID == (execinfrapb.FlowID{}) {
f.FlowID.UUID = uuid.MakeV4()
}
}
}
log.VEvent(ctx, 1, "creating plan diagram")
var stmtStr string
if p.planner != nil && p.planner.stmt != nil {
stmtStr = p.planner.stmt.String()
}
diagram, err := execinfrapb.GeneratePlanDiagram(
stmtStr, flows, p.saveDiagramShowInputTypes,
)
if err != nil {
return nil, err
}
return diagram, nil
}

// PhysicalPlan is a partial physical plan which corresponds to a planNode
// (partial in that it can correspond to a planNode subtree and not necessarily
// to the entire planNode for a given query).
Expand Down
32 changes: 4 additions & 28 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -338,30 +337,11 @@ func (dsp *DistSQLPlanner) Run(
return func() {}
}

if planCtx.saveDiagram != nil {
// Local flows might not have the UUID field set. We need it to be set to
// distinguish statistics for processors in subqueries vs the main query vs
// postqueries.
if len(flows) == 1 {
for _, f := range flows {
if f.FlowID == (execinfrapb.FlowID{}) {
f.FlowID.UUID = uuid.MakeV4()
}
}
}
log.VEvent(ctx, 1, "creating plan diagram")
var stmtStr string
if planCtx.planner != nil && planCtx.planner.stmt != nil {
stmtStr = planCtx.planner.stmt.String()
}
diagram, err := execinfrapb.GeneratePlanDiagram(
stmtStr, flows, planCtx.saveDiagramShowInputTypes,
)
if err != nil {
if planCtx.saveFlows != nil {
if err := planCtx.saveFlows(flows); err != nil {
recv.SetError(err)
return func() {}
}
planCtx.saveDiagram(diagram)
}

if logPlanDiagram {
Expand Down Expand Up @@ -872,9 +852,7 @@ func (dsp *DistSQLPlanner) planAndRunSubquery(
subqueryPlanCtx := dsp.NewPlanningCtx(ctx, evalCtx, planner, planner.txn, distributeSubquery)
subqueryPlanCtx.stmtType = tree.Rows
if planner.collectBundle {
subqueryPlanCtx.saveDiagram = func(diagram execinfrapb.FlowDiagram) {
planner.curPlan.distSQLDiagrams = append(planner.curPlan.distSQLDiagrams, diagram)
}
subqueryPlanCtx.saveFlows = subqueryPlanCtx.getDefaultSaveFlowsFunc(ctx, planner, planComponentTypeSubquery)
}
// Don't close the top-level plan from subqueries - someone else will handle
// that.
Expand Down Expand Up @@ -1165,9 +1143,7 @@ func (dsp *DistSQLPlanner) planAndRunPostquery(
postqueryPlanCtx.stmtType = tree.Rows
postqueryPlanCtx.ignoreClose = true
if planner.collectBundle {
postqueryPlanCtx.saveDiagram = func(diagram execinfrapb.FlowDiagram) {
planner.curPlan.distSQLDiagrams = append(planner.curPlan.distSQLDiagrams, diagram)
}
postqueryPlanCtx.saveFlows = postqueryPlanCtx.getDefaultSaveFlowsFunc(ctx, planner, planComponentTypePostquery)
}

postqueryPhysPlan, err := dsp.createPhysPlan(postqueryPlanCtx, postqueryPlan)
Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colexec"
"github.com/cockroachdb/cockroach/pkg/sql/distsql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/gcjob/gcjobnotifier"
"github.com/cockroachdb/cockroach/pkg/sql/opt"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
Expand Down Expand Up @@ -853,6 +854,13 @@ type ExecutorTestingKnobs struct {
// should be performed (typically turned on during tests only to guard against
// wild descriptors which are corrupted due to bugs).
TestingDescriptorValidation bool

// TestingSaveFlows, if set, will be called with the given stmt. The resulting
// function will be called with the physical plan of that statement's main
// query (i.e. no subqueries). The physical plan is only safe for use for the
// lifetime of this function. Note that returning a nil function is
// unsupported and will lead to a panic.
TestingSaveFlows func(stmt string) func(map[roachpb.NodeID]*execinfrapb.FlowSpec) error
}

// PGWireTestingKnobs contains knobs for the pgwire module.
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/execinfrapb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)

// ProcessorID identifies a processor in the context of a specific flow.
type ProcessorID int

// StreamID identifies a stream; it may be local to a flow or it may cross
// machine boundaries. The identifier can only be used in the context of a
// specific flow.
Expand Down
45 changes: 45 additions & 0 deletions pkg/sql/execstats/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "execstats",
srcs = ["traceanalyzer.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/sql/execstats",
visibility = ["//visibility:public"],
deps = [
"//pkg/roachpb",
"//pkg/sql/colexec/execpb",
"//pkg/sql/execinfrapb",
"//pkg/sql/flowinfra",
"//pkg/util/tracing/tracingpb",
"//vendor/github.com/cockroachdb/errors",
"//vendor/github.com/gogo/protobuf/types",
],
)

go_test(
name = "execstats_test",
srcs = [
"main_test.go",
"traceanalyzer_test.go",
],
deps = [
":execstats",
"//pkg/base",
"//pkg/roachpb",
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/sql",
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/sessiondata",
"//pkg/sql/sessiondatapb",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/tracing",
"//vendor/github.com/stretchr/testify/require",
],
)
29 changes: 29 additions & 0 deletions pkg/sql/execstats/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package execstats_test

import (
"os"
"testing"

"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/security/securitytest"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
)

func TestMain(m *testing.M) {
security.SetAssetLoader(securitytest.EmbeddedAssets)
serverutils.InitTestServerFactory(server.TestServerFactory)
serverutils.InitTestClusterFactory(testcluster.TestClusterFactory)
os.Exit(m.Run())
}
Loading