Skip to content

Commit

Permalink
Merge #55969
Browse files Browse the repository at this point in the history
55969: sql/ui: surface network bytes sent in statements page r=RaduBerinde,dhartunian a=asubiotto

Closes #55331 

This PR implements the conditional statistics collection and display described in phase 1 of the always-on EXPLAIN ANALYZE issue (#54556). The goal of this work is to provide a framework to surface more top-level statistics while always-on tracing is being worked on. The next steps here are to surface more top-level statistics we find interesting to the admin UI (e.g. query memory usage by @cathymw in #54340).

Once always-on tracing is completed, the code added to the conn executor to analyze trace data will be moved to `recordStatement`.

cc @jordanlewis 

Co-authored-by: Alfonso Subiotto Marques <[email protected]>
  • Loading branch information
craig[bot] and asubiotto committed Nov 2, 2020
2 parents 6debfee + 5ba1ef4 commit 2df27d8
Show file tree
Hide file tree
Showing 25 changed files with 751 additions and 170 deletions.
4 changes: 3 additions & 1 deletion pkg/roachpb/app_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func (s *StatementStatistics) Add(other *StatementStatistics) {
s.OverheadLat.Add(other.OverheadLat, s.Count, other.Count)
s.BytesRead.Add(other.BytesRead, s.Count, other.Count)
s.RowsRead.Add(other.RowsRead, s.Count, other.Count)
s.BytesSentOverNetwork.Add(other.BytesSentOverNetwork, s.Count, other.Count)

if other.SensitiveInfo.LastErr != "" {
s.SensitiveInfo.LastErr = other.SensitiveInfo.LastErr
Expand All @@ -155,5 +156,6 @@ func (s *StatementStatistics) AlmostEqual(other *StatementStatistics, eps float6
s.OverheadLat.AlmostEqual(other.OverheadLat, eps) &&
s.SensitiveInfo.Equal(other.SensitiveInfo) &&
s.BytesRead.AlmostEqual(other.BytesRead, eps) &&
s.RowsRead.AlmostEqual(other.RowsRead, eps)
s.RowsRead.AlmostEqual(other.RowsRead, eps) &&
s.BytesSentOverNetwork.AlmostEqual(other.BytesSentOverNetwork, eps)
}
254 changes: 150 additions & 104 deletions pkg/roachpb/app_stats.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions pkg/roachpb/app_stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ message StatementStatistics {
// RowsRead collects the number of rows read from disk.
optional NumericStat rows_read = 16 [(gogoproto.nullable) = false];

// BytesSentOverNetwork collects the number of bytes sent over the network.
optional NumericStat bytes_sent_over_network = 17 [(gogoproto.nullable) = false];

// Note: be sure to update `sql/app_stats.go` when adding/removing fields here!
}

Expand Down
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ go_library(
"//pkg/sql/enum",
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/execstats",
"//pkg/sql/faketreeeval",
"//pkg/sql/flowinfra",
"//pkg/sql/gcjob/gcjobnotifier",
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/app_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,11 @@ func (a *appStats) recordStatement(
s.mu.data.OverheadLat.Record(s.mu.data.Count, ovhLat)
s.mu.data.BytesRead.Record(s.mu.data.Count, float64(stats.bytesRead))
s.mu.data.RowsRead.Record(s.mu.data.Count, float64(stats.rowsRead))
// Note that some fields derived from tracing statements (such as
// BytesSentOverNetwork) are not updated here because they are collected
// on-demand.
// TODO(asubiotto): Record the aforementioned fields here when always-on
// tracing is a thing.
s.mu.vectorized = vectorized
s.mu.distSQLUsed = distSQLUsed
s.mu.Unlock()
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/colexec/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ func (vsc *VectorizedStatsCollector) OutputStats(
vsc.MaxAllocatedMem = 0
vsc.MaxAllocatedDisk = 0
vsc.NumBatches = 0
vsc.BytesRead = 0
// BytesRead is overridden to a useful value for tests.
vsc.BytesRead = 8 * vsc.NumTuples
}
span.SetSpanStats(&vsc.VectorizedStats)
span.Finish()
Expand Down
40 changes: 35 additions & 5 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/paramparse"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
Expand Down Expand Up @@ -380,6 +379,37 @@ func (ex *connExecutor) execStmtInOpenState(
)
}
}

stmtStats, _ := ex.appStats.getStatsForStmt(&stmt, ex.implicitTxn(), retErr, false)
if stmtStats == nil {
return
}

networkBytesSent := int64(0)
for _, flowInfo := range p.curPlan.distSQLFlowInfos {
analyzer := flowInfo.analyzer
if err := analyzer.AddTrace(trace); err != nil {
log.VInfof(ctx, 1, "error analyzing trace statistics for stmt %s: %v", stmt, err)
continue
}

networkBytesSentGroupedByNode, err := analyzer.GetNetworkBytesSent()
if err != nil {
log.VInfof(ctx, 1, "error calculating network bytes sent for stmt %s: %v", stmt, err)
continue
}
for _, bytesSentByNode := range networkBytesSentGroupedByNode {
networkBytesSent += bytesSentByNode
}
}

stmtStats.mu.Lock()
// Record trace-related statistics. A count of 1 is passed given that this
// statistic is only recorded when statement diagnostics are enabled.
// TODO(asubiotto): NumericStat properties will be properly calculated
// once this statistic is always collected.
stmtStats.mu.data.BytesSentOverNetwork.Record(1 /* count */, float64(networkBytesSent))
stmtStats.mu.Unlock()
}()
}

Expand Down Expand Up @@ -962,10 +992,10 @@ func (ex *connExecutor) execWithDistSQLEngine(
evalCtx := planner.ExtendedEvalContext()
planCtx := ex.server.cfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, planner, planner.txn, distribute)
planCtx.stmtType = recv.stmtType
if planner.collectBundle {
planCtx.saveDiagram = func(diagram execinfrapb.FlowDiagram) {
planner.curPlan.distSQLDiagrams = append(planner.curPlan.distSQLDiagrams, diagram)
}
if ex.server.cfg.TestingKnobs.TestingSaveFlows != nil {
planCtx.saveFlows = ex.server.cfg.TestingKnobs.TestingSaveFlows(planner.stmt.SQL)
} else if planner.collectBundle {
planCtx.saveFlows = planCtx.getDefaultSaveFlowsFunc(ctx, planner, planComponentTypeMainQuery)
}

var evalCtxFactory func() *extendedEvalContext
Expand Down
56 changes: 51 additions & 5 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/distsql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/execstats"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
Expand Down Expand Up @@ -605,11 +606,10 @@ type PlanningCtx struct {
// be replaced by evaluation. Should only be set by EXPLAIN.
noEvalSubqueries bool

// If set, a diagram for the plan will be generated and passed to this
// function.
saveDiagram func(execinfrapb.FlowDiagram)
// If set, the diagram passed to saveDiagram will show the types of each
// stream.
// If set, the flows for the physical plan will be passed to this function.
// The flows are not safe for use past the lifetime of the saveFlows function.
saveFlows func(map[roachpb.NodeID]*execinfrapb.FlowSpec) error
// If set, the result of flowSpecsToDiagram will show the types of each stream.
saveDiagramShowInputTypes bool
}

Expand Down Expand Up @@ -637,6 +637,52 @@ func (p *PlanningCtx) EvaluateSubqueries() bool {
return !p.noEvalSubqueries
}

// getDefaultSaveFlowsFunc returns the default function used to save physical
// plans and their diagrams.
func (p *PlanningCtx) getDefaultSaveFlowsFunc(
ctx context.Context, planner *planner, typ planComponentType,
) func(map[roachpb.NodeID]*execinfrapb.FlowSpec) error {
return func(flows map[roachpb.NodeID]*execinfrapb.FlowSpec) error {
diagram, err := p.flowSpecsToDiagram(ctx, flows)
if err != nil {
return err
}
planner.curPlan.distSQLFlowInfos = append(
planner.curPlan.distSQLFlowInfos, flowInfo{typ: typ, diagram: diagram, analyzer: execstats.NewTraceAnalyzer(flows)},
)
return nil
}
}

// flowSpecsToDiagram is a helper function used to convert flowSpecs into a
// FlowDiagram using this PlanningCtx's information.
func (p *PlanningCtx) flowSpecsToDiagram(
ctx context.Context, flows map[roachpb.NodeID]*execinfrapb.FlowSpec,
) (execinfrapb.FlowDiagram, error) {
// Local flows might not have the UUID field set. We need it to be set to
// distinguish statistics for processors in subqueries vs the main query vs
// postqueries.
if len(flows) == 1 {
for _, f := range flows {
if f.FlowID == (execinfrapb.FlowID{}) {
f.FlowID.UUID = uuid.MakeV4()
}
}
}
log.VEvent(ctx, 1, "creating plan diagram")
var stmtStr string
if p.planner != nil && p.planner.stmt != nil {
stmtStr = p.planner.stmt.String()
}
diagram, err := execinfrapb.GeneratePlanDiagram(
stmtStr, flows, p.saveDiagramShowInputTypes,
)
if err != nil {
return nil, err
}
return diagram, nil
}

// PhysicalPlan is a partial physical plan which corresponds to a planNode
// (partial in that it can correspond to a planNode subtree and not necessarily
// to the entire planNode for a given query).
Expand Down
32 changes: 4 additions & 28 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -338,30 +337,11 @@ func (dsp *DistSQLPlanner) Run(
return func() {}
}

if planCtx.saveDiagram != nil {
// Local flows might not have the UUID field set. We need it to be set to
// distinguish statistics for processors in subqueries vs the main query vs
// postqueries.
if len(flows) == 1 {
for _, f := range flows {
if f.FlowID == (execinfrapb.FlowID{}) {
f.FlowID.UUID = uuid.MakeV4()
}
}
}
log.VEvent(ctx, 1, "creating plan diagram")
var stmtStr string
if planCtx.planner != nil && planCtx.planner.stmt != nil {
stmtStr = planCtx.planner.stmt.String()
}
diagram, err := execinfrapb.GeneratePlanDiagram(
stmtStr, flows, planCtx.saveDiagramShowInputTypes,
)
if err != nil {
if planCtx.saveFlows != nil {
if err := planCtx.saveFlows(flows); err != nil {
recv.SetError(err)
return func() {}
}
planCtx.saveDiagram(diagram)
}

if logPlanDiagram {
Expand Down Expand Up @@ -872,9 +852,7 @@ func (dsp *DistSQLPlanner) planAndRunSubquery(
subqueryPlanCtx := dsp.NewPlanningCtx(ctx, evalCtx, planner, planner.txn, distributeSubquery)
subqueryPlanCtx.stmtType = tree.Rows
if planner.collectBundle {
subqueryPlanCtx.saveDiagram = func(diagram execinfrapb.FlowDiagram) {
planner.curPlan.distSQLDiagrams = append(planner.curPlan.distSQLDiagrams, diagram)
}
subqueryPlanCtx.saveFlows = subqueryPlanCtx.getDefaultSaveFlowsFunc(ctx, planner, planComponentTypeSubquery)
}
// Don't close the top-level plan from subqueries - someone else will handle
// that.
Expand Down Expand Up @@ -1165,9 +1143,7 @@ func (dsp *DistSQLPlanner) planAndRunPostquery(
postqueryPlanCtx.stmtType = tree.Rows
postqueryPlanCtx.ignoreClose = true
if planner.collectBundle {
postqueryPlanCtx.saveDiagram = func(diagram execinfrapb.FlowDiagram) {
planner.curPlan.distSQLDiagrams = append(planner.curPlan.distSQLDiagrams, diagram)
}
postqueryPlanCtx.saveFlows = postqueryPlanCtx.getDefaultSaveFlowsFunc(ctx, planner, planComponentTypePostquery)
}

postqueryPhysPlan, err := dsp.createPhysPlan(postqueryPlanCtx, postqueryPlan)
Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colexec"
"github.com/cockroachdb/cockroach/pkg/sql/distsql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/gcjob/gcjobnotifier"
"github.com/cockroachdb/cockroach/pkg/sql/opt"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
Expand Down Expand Up @@ -853,6 +854,13 @@ type ExecutorTestingKnobs struct {
// should be performed (typically turned on during tests only to guard against
// wild descriptors which are corrupted due to bugs).
TestingDescriptorValidation bool

// TestingSaveFlows, if set, will be called with the given stmt. The resulting
// function will be called with the physical plan of that statement's main
// query (i.e. no subqueries). The physical plan is only safe for use for the
// lifetime of this function. Note that returning a nil function is
// unsupported and will lead to a panic.
TestingSaveFlows func(stmt string) func(map[roachpb.NodeID]*execinfrapb.FlowSpec) error
}

// PGWireTestingKnobs contains knobs for the pgwire module.
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/execinfrapb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)

// ProcessorID identifies a processor in the context of a specific flow.
type ProcessorID int

// StreamID identifies a stream; it may be local to a flow or it may cross
// machine boundaries. The identifier can only be used in the context of a
// specific flow.
Expand Down
45 changes: 45 additions & 0 deletions pkg/sql/execstats/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "execstats",
srcs = ["traceanalyzer.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/sql/execstats",
visibility = ["//visibility:public"],
deps = [
"//pkg/roachpb",
"//pkg/sql/colexec/execpb",
"//pkg/sql/execinfrapb",
"//pkg/sql/flowinfra",
"//pkg/util/tracing/tracingpb",
"//vendor/github.com/cockroachdb/errors",
"//vendor/github.com/gogo/protobuf/types",
],
)

go_test(
name = "execstats_test",
srcs = [
"main_test.go",
"traceanalyzer_test.go",
],
deps = [
":execstats",
"//pkg/base",
"//pkg/roachpb",
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/sql",
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/sessiondata",
"//pkg/sql/sessiondatapb",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/tracing",
"//vendor/github.com/stretchr/testify/require",
],
)
29 changes: 29 additions & 0 deletions pkg/sql/execstats/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package execstats_test

import (
"os"
"testing"

"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/security/securitytest"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
)

func TestMain(m *testing.M) {
security.SetAssetLoader(securitytest.EmbeddedAssets)
serverutils.InitTestServerFactory(server.TestServerFactory)
serverutils.InitTestClusterFactory(testcluster.TestClusterFactory)
os.Exit(m.Run())
}
Loading

0 comments on commit 2df27d8

Please sign in to comment.