Skip to content

Commit

Permalink
Merge #56286
Browse files Browse the repository at this point in the history
56286: sql: refactor instrumentation of query execution r=RaduBerinde a=RaduBerinde

Note: this PR covers only the two top commits; the others are #56262.

#### sql: refactor instrumentation of query execution

This change separates the code around setting up tracing and
collecting bundles into a new instrumentationHelper. The intention is
that more related logic (like collecting plans for the UI) will be
incorporated.

Release note: None

#### sql: move more logic into instrumentationHelper

This commit moves the logic around collecting explain plans (for
bundles and for UI) into `instrumentationHelper`.

Release note: None

Co-authored-by: Radu Berinde <[email protected]>
  • Loading branch information
craig[bot] and RaduBerinde committed Nov 6, 2020
2 parents 8489da9 + 8c27403 commit 3765351
Show file tree
Hide file tree
Showing 11 changed files with 320 additions and 186 deletions.
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ go_library(
"information_schema.go",
"insert.go",
"insert_fast_path.go",
"instrumentation.go",
"internal.go",
"inverted_filter.go",
"inverted_join.go",
Expand Down
3 changes: 1 addition & 2 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2179,6 +2179,7 @@ func (ex *connExecutor) resetPlanner(
) {
p.txn = txn
p.stmt = Statement{}
p.instrumentation = instrumentationHelper{}

p.cancelChecker.Reset(ctx)

Expand All @@ -2193,8 +2194,6 @@ func (ex *connExecutor) resetPlanner(
p.autoCommit = false
p.isPreparing = false
p.avoidCachedDescriptors = false
p.discardRows = false
p.collectBundle = false
}

// txnStateTransitionsApplyWrapper is a wrapper on top of Machine built with the
Expand Down
110 changes: 13 additions & 97 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/cancelchecker"
Expand Down Expand Up @@ -340,15 +339,11 @@ func (ex *connExecutor) execStmtInOpenState(
ex.resetPlanner(ctx, p, ex.state.mu.txn, stmtTS)
p.sessionDataMutator.paramStatusUpdater = res
p.noticeSender = res

var shouldCollectDiagnostics bool
var diagRequestID stmtdiagnostics.RequestID
var finishCollectionDiagnostics func()
ih := &p.instrumentation

if explainBundle, ok := ast.(*tree.ExplainAnalyzeDebug); ok {
telemetry.Inc(sqltelemetry.ExplainAnalyzeDebugUseCounter)
// Always collect diagnostics for EXPLAIN ANALYZE (DEBUG).
shouldCollectDiagnostics = true
ih.SetOutputMode(explainAnalyzeDebugOutput)
// Strip off the explain node to execute the inner statement.
stmt.AST = explainBundle.Statement
ast = stmt.AST
Expand All @@ -359,92 +354,20 @@ func (ex *connExecutor) execStmtInOpenState(
// reflect the column types of the EXPLAIN itself and not those of the inner
// statement).
stmt.ExpectedTypes = nil

// EXPLAIN ANALYZE (DEBUG) does not return the rows for the given query;
// instead it returns some text which includes a URL.
// TODO(radu): maybe capture some of the rows and include them in the
// bundle.
p.discardRows = true
} else {
shouldCollectDiagnostics, diagRequestID, finishCollectionDiagnostics =
ex.stmtDiagnosticsRecorder.ShouldCollectDiagnostics(ctx, stmt.AnonymizedStr)
}

if shouldCollectDiagnostics {
p.collectBundle = true
tr := ex.server.cfg.AmbientCtx.Tracer
origCtx := ctx
var sp *tracing.Span
ctx, sp = tracing.StartSnowballTrace(ctx, tr, "traced statement")
// TODO(radu): consider removing this if/when #46164 is addressed.
p.extendedEvalCtx.Context = ctx
fingerprint := stmt.AnonymizedStr
var needFinish bool
ctx, needFinish = ih.Setup(
ctx, ex.server.cfg, ex.appStats, p, ex.stmtDiagnosticsRecorder,
stmt.AnonymizedStr, os.ImplicitTxn.Get(),
)
if needFinish {
sql := stmt.SQL
defer func() {
// Record the statement information that we've collected.
// Note that in case of implicit transactions, the trace contains the auto-commit too.
sp.Finish()
trace := sp.GetRecording()
ie := p.extendedEvalCtx.InternalExecutor.(*InternalExecutor)
placeholders := p.extendedEvalCtx.Placeholders
bundle := buildStatementBundle(
origCtx, ex.server.cfg.DB, ie, &p.curPlan, trace, placeholders,
)
bundle.insert(origCtx, fingerprint, ast, ex.server.cfg.StmtDiagnosticsRecorder, diagRequestID)
if finishCollectionDiagnostics != nil {
finishCollectionDiagnostics()
telemetry.Inc(sqltelemetry.StatementDiagnosticsCollectedCounter)
} else {
// Handle EXPLAIN ANALYZE (DEBUG).
// If there was a communication error already, no point in setting any results.
if retErr == nil {
retErr = setExplainBundleResult(origCtx, res, bundle, ex.server.cfg)
}
}

stmtStats, _ := ex.appStats.getStatsForStmt(fingerprint, ex.implicitTxn(), retErr, false)
if stmtStats == nil {
return
}

networkBytesSent := int64(0)
for _, flowInfo := range p.curPlan.distSQLFlowInfos {
analyzer := flowInfo.analyzer
if err := analyzer.AddTrace(trace); err != nil {
log.VInfof(ctx, 1, "error analyzing trace statistics for stmt %s: %v", ast, err)
continue
}

networkBytesSentGroupedByNode, err := analyzer.GetNetworkBytesSent()
if err != nil {
log.VInfof(ctx, 1, "error calculating network bytes sent for stmt %s: %v", ast, err)
continue
}
for _, bytesSentByNode := range networkBytesSentGroupedByNode {
networkBytesSent += bytesSentByNode
}
}

stmtStats.mu.Lock()
// Record trace-related statistics. A count of 1 is passed given that this
// statistic is only recorded when statement diagnostics are enabled.
// TODO(asubiotto): NumericStat properties will be properly calculated
// once this statistic is always collected.
stmtStats.mu.data.BytesSentOverNetwork.Record(1 /* count */, float64(networkBytesSent))
stmtStats.mu.Unlock()
retErr = ih.Finish(ex.server.cfg, ex.appStats, p, ast, sql, res, retErr)
}()
}

if ex.server.cfg.TestingKnobs.WithStatementTrace != nil {
tr := ex.server.cfg.AmbientCtx.Tracer
var sp *tracing.Span
sql := stmt.SQL
ctx, sp = tracing.StartSnowballTrace(ctx, tr, sql)
// TODO(radu): consider removing this if/when #46164 is addressed.
p.extendedEvalCtx.Context = ctx

defer func() {
ex.server.cfg.TestingKnobs.WithStatementTrace(sp.GetRecording(), sql)
}()
}

makeErrEvent := func(err error) (fsm.Event, fsm.EventPayload, error) {
Expand Down Expand Up @@ -588,7 +511,7 @@ func (ex *connExecutor) execStmtInOpenState(
res.ResetStmtType(ps.AST)

if s.DiscardRows {
p.discardRows = true
ih.SetDiscardRows()
}
}

Expand Down Expand Up @@ -943,13 +866,6 @@ func (ex *connExecutor) dispatchToExecutionEngine(
// makeExecPlan creates an execution plan and populates planner.curPlan using
// the cost-based optimizer.
func (ex *connExecutor) makeExecPlan(ctx context.Context, planner *planner) error {
savePlanString := planner.collectBundle
planner.curPlan.init(
&planner.stmt,
ex.appStats,
savePlanString,
)

if err := planner.makeOptimizerPlan(ctx); err != nil {
log.VEventf(ctx, 1, "optimizer plan failed: %v", err)
return err
Expand Down Expand Up @@ -1016,7 +932,7 @@ func (ex *connExecutor) execWithDistSQLEngine(
planCtx.stmtType = recv.stmtType
if ex.server.cfg.TestingKnobs.TestingSaveFlows != nil {
planCtx.saveFlows = ex.server.cfg.TestingKnobs.TestingSaveFlows(planner.stmt.SQL)
} else if planner.collectBundle {
} else if planner.instrumentation.ShouldCollectBundle() {
planCtx.saveFlows = planCtx.getDefaultSaveFlowsFunc(ctx, planner, planComponentTypeMainQuery)
}

Expand Down Expand Up @@ -1047,7 +963,7 @@ func (ex *connExecutor) execWithDistSQLEngine(
return recv.stats, recv.commErr
}
}
recv.discardRows = planner.discardRows
recv.discardRows = planner.instrumentation.ShouldDiscardRows()
// We pass in whether or not we wanted to distribute this plan, which tells
// the planner whether or not to plan remote table readers.
cleanup := ex.server.cfg.DistSQLPlanner.PlanAndRun(
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -834,7 +834,7 @@ func (dsp *DistSQLPlanner) planAndRunSubquery(
).WillDistribute()
subqueryPlanCtx := dsp.NewPlanningCtx(ctx, evalCtx, planner, planner.txn, distributeSubquery)
subqueryPlanCtx.stmtType = tree.Rows
if planner.collectBundle {
if planner.instrumentation.ShouldCollectBundle() {
subqueryPlanCtx.saveFlows = subqueryPlanCtx.getDefaultSaveFlowsFunc(ctx, planner, planComponentTypeSubquery)
}
// Don't close the top-level plan from subqueries - someone else will handle
Expand Down Expand Up @@ -1125,7 +1125,7 @@ func (dsp *DistSQLPlanner) planAndRunPostquery(
postqueryPlanCtx := dsp.NewPlanningCtx(ctx, evalCtx, planner, planner.txn, distributePostquery)
postqueryPlanCtx.stmtType = tree.Rows
postqueryPlanCtx.ignoreClose = true
if planner.collectBundle {
if planner.instrumentation.ShouldCollectBundle() {
postqueryPlanCtx.saveFlows = postqueryPlanCtx.getDefaultSaveFlowsFunc(ctx, planner, planComponentTypePostquery)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/executor_statement_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (ex *connExecutor) recordStatementSummary(
}

stmtID := ex.statsCollector.recordStatement(
stmt, planner.curPlan.planForStats,
stmt, planner.instrumentation.PlanForStats(ctx),
flags.IsDistributed(), flags.IsSet(planFlagVectorized),
flags.IsSet(planFlagImplicitTxn), automaticRetryCount, rowsAffected, err,
parseLat, planLat, runLat, svcLat, execOverhead, stats,
Expand Down
7 changes: 4 additions & 3 deletions pkg/sql/explain_bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func buildStatementBundle(
db *kv.DB,
ie *InternalExecutor,
plan *planTop,
planString string,
trace tracing.Recording,
placeholders *tree.PlaceholderInfo,
) diagnosticsBundle {
Expand All @@ -158,7 +159,7 @@ func buildStatementBundle(

b.addStatement()
b.addOptPlans()
b.addExecPlan()
b.addExecPlan(planString)
// TODO(yuzefovich): consider adding some variant of EXPLAIN (VEC) output
// of the query to the bundle.
b.addDistSQLDiagrams()
Expand Down Expand Up @@ -266,8 +267,8 @@ func (b *stmtBundleBuilder) addOptPlans() {
}

// addExecPlan adds the EXPLAIN (VERBOSE) plan as file plan.txt.
func (b *stmtBundleBuilder) addExecPlan() {
if plan := b.plan.planString; plan != "" {
func (b *stmtBundleBuilder) addExecPlan(plan string) {
if plan != "" {
b.z.AddFile("plan.txt", plan)
}
}
Expand Down
11 changes: 7 additions & 4 deletions pkg/sql/explain_tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,21 @@ func TestPlanToTreeAndPlanToString(t *testing.T) {
defer cleanup()
p := internalPlanner.(*planner)

ih := &p.instrumentation
ih.codec = execCfg.Codec
ih.collectBundle = true
ih.savePlanForStats = true

p.stmt = makeStatement(stmt, ClusterWideID{})
p.curPlan.savePlanString = true
p.curPlan.savePlanForStats = true
if err := p.makeOptimizerPlan(ctx); err != nil {
t.Fatal(err)
}
p.curPlan.flags.Set(planFlagExecDone)
p.curPlan.close(ctx)
if d.Cmd == "plan-string" {
return p.curPlan.planString
return ih.planString()
}
treeYaml, err := yaml.Marshal(p.curPlan.planForStats)
treeYaml, err := yaml.Marshal(ih.PlanForStats(ctx))
if err != nil {
t.Fatal(err)
}
Expand Down
Loading

0 comments on commit 3765351

Please sign in to comment.