Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
59132: sql: introduce sql.statement_stats.sample_rate to sample execution stats r=RaduBerinde,dhartunian a=asubiotto

Depends on cockroachdb#58897 
Depends on cockroachdb#59103 

This PR puts the "always-on" into always-on EXPLAIN ANALYZE. Take a look at separate commits for details. What actually goes on is that we're taking the slightly safer route of introducing a cluster setting which defines a sample rate for execution stats. These execution stats are collected and propagated using background tracing, which is cheaper than verbose tracing. This allows us to power the new DB Console statement stats views. Currently we still need user input in order to turn up this sample rate.

The sample rate is 0 by default in this PR for safety reasons. I'd like to discuss the default value of this cluster setting or whether we need it at all separately before the 21.1 release, but this gives us a nice escape hatch if for whatever reason stats collection results in poor performance.

Closes cockroachdb#54556 

59536: colexec,bazel: pin the `types` dependency in generated files r=irfansharif a=irfansharif

This is a workaround for bazel auto-generated code. goimports does not
automatically pick up the right packages when run within the bazel
sandbox, so we have to pin it by hand.

Release note: None

Co-authored-by: Alfonso Subiotto Marques <[email protected]>
Co-authored-by: irfan sharif <[email protected]>
  • Loading branch information
3 people committed Jan 28, 2021
3 parents eddbc07 + 2615ca9 + 0d79e70 commit 7d27d9a
Show file tree
Hide file tree
Showing 37 changed files with 560 additions and 228 deletions.
14 changes: 12 additions & 2 deletions pkg/roachpb/app_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,16 @@ func (s *StatementStatistics) Add(other *StatementStatistics) {
s.OverheadLat.Add(other.OverheadLat, s.Count, other.Count)
s.BytesRead.Add(other.BytesRead, s.Count, other.Count)
s.RowsRead.Add(other.RowsRead, s.Count, other.Count)
s.BytesSentOverNetwork.Add(other.BytesSentOverNetwork, s.Count, other.Count)

// Execution stats collected using a sampling approach.
statCollectionCount := s.ExecStatCollectionCount
if statCollectionCount == 0 && other.ExecStatCollectionCount == 0 {
// If both are zero, artificially set the receiver's count to one to avoid
// division by zero in Add.
statCollectionCount = 1
}
s.BytesSentOverNetwork.Add(other.BytesSentOverNetwork, s.ExecStatCollectionCount, statCollectionCount)
s.MaxMemUsage.Add(other.MaxMemUsage, s.ExecStatCollectionCount, statCollectionCount)

if other.SensitiveInfo.LastErr != "" {
s.SensitiveInfo.LastErr = other.SensitiveInfo.LastErr
Expand Down Expand Up @@ -157,5 +166,6 @@ func (s *StatementStatistics) AlmostEqual(other *StatementStatistics, eps float6
s.SensitiveInfo.Equal(other.SensitiveInfo) &&
s.BytesRead.AlmostEqual(other.BytesRead, eps) &&
s.RowsRead.AlmostEqual(other.RowsRead, eps) &&
s.BytesSentOverNetwork.AlmostEqual(other.BytesSentOverNetwork, eps)
s.BytesSentOverNetwork.AlmostEqual(other.BytesSentOverNetwork, eps) &&
s.MaxMemUsage.AlmostEqual(other.MaxMemUsage, eps)
}
294 changes: 188 additions & 106 deletions pkg/roachpb/app_stats.pb.go

Large diffs are not rendered by default.

17 changes: 16 additions & 1 deletion pkg/roachpb/app_stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ option go_package = "roachpb";
import "gogoproto/gogo.proto";
import "google/protobuf/timestamp.proto";

// StatementStatistics represent the statement statistics sent to the DB
// Console for a given statement fingerprint. Note that these stats are cleared
// ever diagnostics.sql_stat_reset_interval.
// N.B. When fields are added to this struct, make sure to update
// (*StatementStatistics).Add and (*StatementStatistics).AlmostEqual
// in app_stats.go.
Expand Down Expand Up @@ -89,7 +92,19 @@ message StatementStatistics {
// BytesSentOverNetwork collects the number of bytes sent over the network.
optional NumericStat bytes_sent_over_network = 17 [(gogoproto.nullable) = false];

// Note: be sure to update `sql/app_stats.go` when adding/removing fields here!
// MaxMemUsage collects the maximum memory usage that occurred on a node.
optional NumericStat max_mem_usage = 18 [(gogoproto.nullable) = false];

// ExecStatCollectionCount keeps track of how many times execution stats were
// recorded for this statement. Since this collection follows a sampling
// approach, this number is not necessarily equal to Count. Used to calculate
// the mean of the following NumericStat values:
// bytes_sent_over_network
// max_mem_usage
optional int64 exec_stat_collection_count = 19 [(gogoproto.nullable) = false];

// Note: be sure to update `sql/app_stats.go` and the comment above
// exec_stat_collection_count when adding/removing fields here!
}

message TransactionStatistics {
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ go_test(
"explain_test.go",
"explain_tree_test.go",
"indexbackfiller_test.go",
"instrumentation_test.go",
"internal_test.go",
"main_test.go",
"materialized_view_test.go",
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/crossjoiner.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/sql/colexec/crossjoiner_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
// pick up the right packages when run within the bazel sandbox.
var (
_ = typeconv.DatumVecCanonicalTypeFamily
_ = types.BoolFamily
)

// buildFromLeftInput builds part of the output of a cross join that comes from
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (f *vectorizedFlow) Setup(
log.Infof(ctx, "setting up vectorize flow %s", f.ID.Short())
}
recordingStats := false
if sp := tracing.SpanFromContext(ctx); sp != nil && sp.IsVerbose() {
if execinfra.ShouldCollectStats(ctx, &f.FlowCtx) {
recordingStats = true
}
helper := newVectorizedFlowCreatorHelper(f.FlowBase)
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"fmt"
"io"
"math"
"math/rand"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -595,6 +596,7 @@ func (s *Server) newConnExecutor(
// ctxHolder will be reset at the start of run(). We only define
// it here so that an early call to close() doesn't panic.
ctxHolder: ctxHolder{connCtx: ctx},
rng: rand.New(rand.NewSource(timeutil.Now().UnixNano())),
executorType: executorTypeExec,
hasCreatedTemporarySchema: false,
stmtDiagnosticsRecorder: s.cfg.StmtDiagnosticsRecorder,
Expand Down Expand Up @@ -1077,6 +1079,10 @@ type connExecutor struct {
// transactions.
statsCollector *sqlStatsCollector

// rng is used to generate random numbers. An example usage is to determine
// whether to sample execution stats.
rng *rand.Rand

// mu contains of all elements of the struct that can be changed
// after initialization, and may be accessed from another thread.
mu struct {
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func (ex *connExecutor) execStmtInOpenState(
var needFinish bool
ctx, needFinish = ih.Setup(
ctx, ex.server.cfg, ex.appStats, p, ex.stmtDiagnosticsRecorder,
stmt.AnonymizedStr, os.ImplicitTxn.Get(),
stmt.AnonymizedStr, os.ImplicitTxn.Get(), ex.rng,
)
if needFinish {
sql := stmt.SQL
Expand Down Expand Up @@ -975,6 +975,7 @@ func (ex *connExecutor) execWithDistSQLEngine(
planCtx.saveFlows = planCtx.getDefaultSaveFlowsFunc(ctx, planner, planComponentTypeMainQuery)
}
planCtx.traceMetadata = planner.instrumentation.traceMetadata
planCtx.collectExecStats = planner.instrumentation.ShouldCollectExecStats()

var evalCtxFactory func() *extendedEvalContext
if len(planner.curPlan.subqueryPlans) != 0 ||
Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/distsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func (ds *ServerImpl) setupFlow(

// Create the FlowCtx for the flow.
flowCtx := ds.NewFlowContext(
ctx, req.Flow.FlowID, evalCtx, req.TraceKV, localState, req.Flow.Gateway == roachpb.NodeID(ds.NodeID.SQLInstanceID()),
ctx, req.Flow.FlowID, evalCtx, req.TraceKV, req.CollectStats, localState, req.Flow.Gateway == roachpb.NodeID(ds.NodeID.SQLInstanceID()),
)

// req always contains the desired vectorize mode, regardless of whether we
Expand Down Expand Up @@ -400,6 +400,7 @@ func (ds *ServerImpl) NewFlowContext(
id execinfrapb.FlowID,
evalCtx *tree.EvalContext,
traceKV bool,
collectStats bool,
localState LocalState,
isGatewayNode bool,
) execinfra.FlowCtx {
Expand All @@ -411,6 +412,7 @@ func (ds *ServerImpl) NewFlowContext(
EvalCtx: evalCtx,
NodeID: ds.ServerConfig.NodeID,
TraceKV: traceKV,
CollectStats: collectStats,
Local: localState.IsLocal,
Gateway: isGatewayNode,
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,9 @@ type PlanningCtx struct {
// If set, we will record the mapping from planNode to tracing metadata to
// later allow associating statistics with the planNode.
traceMetadata execNodeTraceMetadata

// If set, statement execution stats should be collected.
collectExecStats bool
}

var _ physicalplan.ExprContext = &PlanningCtx{}
Expand Down
8 changes: 7 additions & 1 deletion pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func (dsp *DistSQLPlanner) setupFlows(
recv *DistSQLReceiver,
localState distsql.LocalState,
vectorizeThresholdMet bool,
collectStats bool,
) (context.Context, flowinfra.Flow, error) {
thisNodeID := dsp.gatewayNodeID
_, ok := flows[thisNodeID]
Expand All @@ -141,6 +142,7 @@ func (dsp *DistSQLPlanner) setupFlows(
Version: execinfra.Version,
EvalContext: evalCtxProto,
TraceKV: evalCtx.Tracing.KVTracingEnabled(),
CollectStats: collectStats,
}

// Start all the flows except the flow on this node (there is always a flow on
Expand Down Expand Up @@ -333,7 +335,9 @@ func (dsp *DistSQLPlanner) Run(
localState.IsLocal = true
}

ctx, flow, err := dsp.setupFlows(ctx, evalCtx, leafInputState, flows, recv, localState, vectorizedThresholdMet)
ctx, flow, err := dsp.setupFlows(
ctx, evalCtx, leafInputState, flows, recv, localState, vectorizedThresholdMet, planCtx.collectExecStats,
)
if err != nil {
recv.SetError(err)
return func() {}
Expand Down Expand Up @@ -834,6 +838,7 @@ func (dsp *DistSQLPlanner) planAndRunSubquery(
subqueryPlanCtx.saveFlows = subqueryPlanCtx.getDefaultSaveFlowsFunc(ctx, planner, planComponentTypeSubquery)
}
subqueryPlanCtx.traceMetadata = planner.instrumentation.traceMetadata
subqueryPlanCtx.collectExecStats = planner.instrumentation.ShouldCollectExecStats()
// Don't close the top-level plan from subqueries - someone else will handle
// that.
subqueryPlanCtx.ignoreClose = true
Expand Down Expand Up @@ -1126,6 +1131,7 @@ func (dsp *DistSQLPlanner) planAndRunPostquery(
postqueryPlanCtx.saveFlows = postqueryPlanCtx.getDefaultSaveFlowsFunc(ctx, planner, planComponentTypePostquery)
}
postqueryPlanCtx.traceMetadata = planner.instrumentation.traceMetadata
postqueryPlanCtx.collectExecStats = planner.instrumentation.ShouldCollectExecStats()

postqueryPhysPlan, err := dsp.createPhysPlan(postqueryPlanCtx, postqueryPlan)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/execinfra/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"readerbase.go",
"scanbase.go",
"server_config.go",
"stats.go",
"testutils.go",
"version.go",
":gen-consumerstatus-stringer", # keep
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/execinfra/flow_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ type FlowCtx struct {
// TraceKV is true if KV tracing was requested by the session.
TraceKV bool

// CollectStats is true if execution stats collection was requested.
CollectStats bool

// Local is true if this flow is being run as part of a local-only query.
Local bool

Expand Down
25 changes: 25 additions & 0 deletions pkg/sql/execinfra/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package execinfra

import (
"context"

"github.com/cockroachdb/cockroach/pkg/util/tracing"
)

// ShouldCollectStats is a helper function used to determine if a processor
// should collect stats. The two requirements are that tracing must be enabled
// (to be able to output the stats somewhere), and that the flowCtx.CollectStats
// flag was set by the gateway node.
func ShouldCollectStats(ctx context.Context, flowCtx *FlowCtx) bool {
return tracing.SpanFromContext(ctx) != nil && flowCtx.CollectStats
}
Loading

0 comments on commit 7d27d9a

Please sign in to comment.