From 9c09da9139a99b69611341a389f5755f1b974b71 Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Thu, 5 Nov 2020 11:24:14 -0800 Subject: [PATCH 1/2] sql: refactor instrumentation of query execution This change separates the code around setting up tracing and collecting bundles into a new instrumentationHelper. The intention is that more related logic (like collecting plans for the UI) will be incorporated. Release note: None --- pkg/sql/BUILD.bazel | 1 + pkg/sql/conn_executor.go | 3 +- pkg/sql/conn_executor_exec.go | 102 ++--------------- pkg/sql/distsql_running.go | 4 +- pkg/sql/instrumentation.go | 209 ++++++++++++++++++++++++++++++++++ pkg/sql/planner.go | 12 +- 6 files changed, 226 insertions(+), 105 deletions(-) create mode 100644 pkg/sql/instrumentation.go diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index b4a32d8de21f..641b86c11bce 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -102,6 +102,7 @@ go_library( "information_schema.go", "insert.go", "insert_fast_path.go", + "instrumentation.go", "internal.go", "inverted_filter.go", "inverted_join.go", diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index a4f8596dbce1..9693d4f6176d 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -2179,6 +2179,7 @@ func (ex *connExecutor) resetPlanner( ) { p.txn = txn p.stmt = Statement{} + p.instrumentation = instrumentationHelper{} p.cancelChecker.Reset(ctx) @@ -2193,8 +2194,6 @@ func (ex *connExecutor) resetPlanner( p.autoCommit = false p.isPreparing = false p.avoidCachedDescriptors = false - p.discardRows = false - p.collectBundle = false } // txnStateTransitionsApplyWrapper is a wrapper on top of Machine built with the diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 9f2d2599cfe4..8205c32e4d65 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -31,7 +31,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlerrors" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" - "github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/cancelchecker" @@ -340,15 +339,11 @@ func (ex *connExecutor) execStmtInOpenState( ex.resetPlanner(ctx, p, ex.state.mu.txn, stmtTS) p.sessionDataMutator.paramStatusUpdater = res p.noticeSender = res - - var shouldCollectDiagnostics bool - var diagRequestID stmtdiagnostics.RequestID - var finishCollectionDiagnostics func() + ih := &p.instrumentation if explainBundle, ok := ast.(*tree.ExplainAnalyzeDebug); ok { telemetry.Inc(sqltelemetry.ExplainAnalyzeDebugUseCounter) - // Always collect diagnostics for EXPLAIN ANALYZE (DEBUG). - shouldCollectDiagnostics = true + ih.SetOutputMode(explainAnalyzeDebugOutput) // Strip off the explain node to execute the inner statement. stmt.AST = explainBundle.Statement ast = stmt.AST @@ -359,92 +354,17 @@ func (ex *connExecutor) execStmtInOpenState( // reflect the column types of the EXPLAIN itself and not those of the inner // statement). stmt.ExpectedTypes = nil - - // EXPLAIN ANALYZE (DEBUG) does not return the rows for the given query; - // instead it returns some text which includes a URL. - // TODO(radu): maybe capture some of the rows and include them in the - // bundle. - p.discardRows = true - } else { - shouldCollectDiagnostics, diagRequestID, finishCollectionDiagnostics = - ex.stmtDiagnosticsRecorder.ShouldCollectDiagnostics(ctx, stmt.AnonymizedStr) } - if shouldCollectDiagnostics { - p.collectBundle = true - tr := ex.server.cfg.AmbientCtx.Tracer - origCtx := ctx - var sp *tracing.Span - ctx, sp = tracing.StartSnowballTrace(ctx, tr, "traced statement") - // TODO(radu): consider removing this if/when #46164 is addressed. - p.extendedEvalCtx.Context = ctx - fingerprint := stmt.AnonymizedStr + var needFinish bool + ctx, needFinish = ih.Setup(ctx, ex.server.cfg, p, ex.stmtDiagnosticsRecorder, stmt.AnonymizedStr, os.ImplicitTxn.Get()) + if needFinish { + sql := stmt.SQL defer func() { - // Record the statement information that we've collected. - // Note that in case of implicit transactions, the trace contains the auto-commit too. - sp.Finish() - trace := sp.GetRecording() - ie := p.extendedEvalCtx.InternalExecutor.(*InternalExecutor) - placeholders := p.extendedEvalCtx.Placeholders - bundle := buildStatementBundle( - origCtx, ex.server.cfg.DB, ie, &p.curPlan, trace, placeholders, - ) - bundle.insert(origCtx, fingerprint, ast, ex.server.cfg.StmtDiagnosticsRecorder, diagRequestID) - if finishCollectionDiagnostics != nil { - finishCollectionDiagnostics() - telemetry.Inc(sqltelemetry.StatementDiagnosticsCollectedCounter) - } else { - // Handle EXPLAIN ANALYZE (DEBUG). - // If there was a communication error already, no point in setting any results. - if retErr == nil { - retErr = setExplainBundleResult(origCtx, res, bundle, ex.server.cfg) - } - } - - stmtStats, _ := ex.appStats.getStatsForStmt(fingerprint, ex.implicitTxn(), retErr, false) - if stmtStats == nil { - return - } - - networkBytesSent := int64(0) - for _, flowInfo := range p.curPlan.distSQLFlowInfos { - analyzer := flowInfo.analyzer - if err := analyzer.AddTrace(trace); err != nil { - log.VInfof(ctx, 1, "error analyzing trace statistics for stmt %s: %v", ast, err) - continue - } - - networkBytesSentGroupedByNode, err := analyzer.GetNetworkBytesSent() - if err != nil { - log.VInfof(ctx, 1, "error calculating network bytes sent for stmt %s: %v", ast, err) - continue - } - for _, bytesSentByNode := range networkBytesSentGroupedByNode { - networkBytesSent += bytesSentByNode - } - } - - stmtStats.mu.Lock() - // Record trace-related statistics. A count of 1 is passed given that this - // statistic is only recorded when statement diagnostics are enabled. - // TODO(asubiotto): NumericStat properties will be properly calculated - // once this statistic is always collected. - stmtStats.mu.data.BytesSentOverNetwork.Record(1 /* count */, float64(networkBytesSent)) - stmtStats.mu.Unlock() + retErr = ih.Finish(ex.server.cfg, ex.appStats, p, ast, sql, res, retErr) }() - } - - if ex.server.cfg.TestingKnobs.WithStatementTrace != nil { - tr := ex.server.cfg.AmbientCtx.Tracer - var sp *tracing.Span - sql := stmt.SQL - ctx, sp = tracing.StartSnowballTrace(ctx, tr, sql) // TODO(radu): consider removing this if/when #46164 is addressed. p.extendedEvalCtx.Context = ctx - - defer func() { - ex.server.cfg.TestingKnobs.WithStatementTrace(sp.GetRecording(), sql) - }() } makeErrEvent := func(err error) (fsm.Event, fsm.EventPayload, error) { @@ -588,7 +508,7 @@ func (ex *connExecutor) execStmtInOpenState( res.ResetStmtType(ps.AST) if s.DiscardRows { - p.discardRows = true + ih.SetDiscardRows() } } @@ -943,7 +863,7 @@ func (ex *connExecutor) dispatchToExecutionEngine( // makeExecPlan creates an execution plan and populates planner.curPlan using // the cost-based optimizer. func (ex *connExecutor) makeExecPlan(ctx context.Context, planner *planner) error { - savePlanString := planner.collectBundle + savePlanString := planner.instrumentation.ShouldCollectBundle() planner.curPlan.init( &planner.stmt, ex.appStats, @@ -1016,7 +936,7 @@ func (ex *connExecutor) execWithDistSQLEngine( planCtx.stmtType = recv.stmtType if ex.server.cfg.TestingKnobs.TestingSaveFlows != nil { planCtx.saveFlows = ex.server.cfg.TestingKnobs.TestingSaveFlows(planner.stmt.SQL) - } else if planner.collectBundle { + } else if planner.instrumentation.ShouldCollectBundle() { planCtx.saveFlows = planCtx.getDefaultSaveFlowsFunc(ctx, planner, planComponentTypeMainQuery) } @@ -1047,7 +967,7 @@ func (ex *connExecutor) execWithDistSQLEngine( return recv.stats, recv.commErr } } - recv.discardRows = planner.discardRows + recv.discardRows = planner.instrumentation.ShouldDiscardRows() // We pass in whether or not we wanted to distribute this plan, which tells // the planner whether or not to plan remote table readers. cleanup := ex.server.cfg.DistSQLPlanner.PlanAndRun( diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index 43a7e9749ca2..e1f38f7a40e3 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -834,7 +834,7 @@ func (dsp *DistSQLPlanner) planAndRunSubquery( ).WillDistribute() subqueryPlanCtx := dsp.NewPlanningCtx(ctx, evalCtx, planner, planner.txn, distributeSubquery) subqueryPlanCtx.stmtType = tree.Rows - if planner.collectBundle { + if planner.instrumentation.ShouldCollectBundle() { subqueryPlanCtx.saveFlows = subqueryPlanCtx.getDefaultSaveFlowsFunc(ctx, planner, planComponentTypeSubquery) } // Don't close the top-level plan from subqueries - someone else will handle @@ -1125,7 +1125,7 @@ func (dsp *DistSQLPlanner) planAndRunPostquery( postqueryPlanCtx := dsp.NewPlanningCtx(ctx, evalCtx, planner, planner.txn, distributePostquery) postqueryPlanCtx.stmtType = tree.Rows postqueryPlanCtx.ignoreClose = true - if planner.collectBundle { + if planner.instrumentation.ShouldCollectBundle() { postqueryPlanCtx.saveFlows = postqueryPlanCtx.getDefaultSaveFlowsFunc(ctx, planner, planComponentTypePostquery) } diff --git a/pkg/sql/instrumentation.go b/pkg/sql/instrumentation.go new file mode 100644 index 000000000000..96123a4d6dbe --- /dev/null +++ b/pkg/sql/instrumentation.go @@ -0,0 +1,209 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sql + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/server/telemetry" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" + "github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/tracing" +) + +// instrumentationHelper encapsulates the logic around extracting information +// about the execution of a statement, like bundles and traces. Typical usage: +// +// - SetOutputMode() can be used as necessary if we are running an EXPLAIN +// ANALYZE variant. +// +// - Setup() is called before query execution. +// +// - SetDiscardRows(), ShouldDiscardRows(), ShouldCollectBundle() can be called +// at any point during execution. +// +// - Finish() is called after query execution. +// +type instrumentationHelper struct { + outputMode outputMode + + // Query fingerprint (anonymized statement). + fingerprint string + implicitTxn bool + + // -- The following fields are initialized by Setup() -- + + // collectBundle is set when we are collecting a diagnostics bundle for a + // statement; it triggers saving of extra information like the plan string. + collectBundle bool + + // discardRows is set if we want to discard any results rather than sending + // them back to the client. Used for testing/benchmarking. Note that the + // resulting schema or the plan are not affected. + // See EXECUTE .. DISCARD ROWS. + discardRows bool + + diagRequestID stmtdiagnostics.RequestID + finishCollectionDiagnostics func() + withStatementTrace func(trace tracing.Recording, stmt string) + + sp *tracing.Span + origCtx context.Context +} + +// outputMode indicates how the statement output needs to be populated (for +// EXPLAIN ANALYZE variants). +type outputMode int8 + +const ( + unmodifiedOutput outputMode = iota + explainAnalyzeDebugOutput +) + +// SetOutputMode can be called before Setup, if we are running an EXPLAIN +// ANALYZE variant. +func (ih *instrumentationHelper) SetOutputMode(outputMode outputMode) { + ih.outputMode = outputMode +} + +// Setup potentially enables snowball tracing for the statement, depending on +// output mode or statement diagnostic activation requests. Finish() must be +// called after the statement finishes execution (unless needFinish=false, in +// which case Finish() is a no-op). +func (ih *instrumentationHelper) Setup( + ctx context.Context, + cfg *ExecutorConfig, + p *planner, + stmtDiagnosticsRecorder *stmtdiagnostics.Registry, + fingerprint string, + implicitTxn bool, +) (newCtx context.Context, needFinish bool) { + ih.fingerprint = fingerprint + ih.implicitTxn = implicitTxn + + if ih.outputMode == explainAnalyzeDebugOutput { + ih.collectBundle = true + // EXPLAIN ANALYZE (DEBUG) does not return the rows for the given query; + // instead it returns some text which includes a URL. + // TODO(radu): maybe capture some of the rows and include them in the + // bundle. + ih.discardRows = true + } else { + ih.collectBundle, ih.diagRequestID, ih.finishCollectionDiagnostics = + stmtDiagnosticsRecorder.ShouldCollectDiagnostics(ctx, fingerprint) + } + + ih.withStatementTrace = cfg.TestingKnobs.WithStatementTrace + + // TODO(radu): logic around saving plans for stats should be here. + + if !ih.collectBundle && ih.withStatementTrace == nil { + return ctx, false + } + + ih.origCtx = ctx + newCtx, ih.sp = tracing.StartSnowballTrace(ctx, cfg.AmbientCtx.Tracer, "traced statement") + return newCtx, true +} + +func (ih *instrumentationHelper) Finish( + cfg *ExecutorConfig, + appStats *appStats, + p *planner, + ast tree.Statement, + stmtRawSQL string, + res RestrictedCommandResult, + retErr error, +) error { + if ih.sp == nil { + return retErr + } + + // Record the statement information that we've collected. + // Note that in case of implicit transactions, the trace contains the auto-commit too. + ih.sp.Finish() + ctx := ih.origCtx + + trace := ih.sp.GetRecording() + ie := p.extendedEvalCtx.InternalExecutor.(*InternalExecutor) + placeholders := p.extendedEvalCtx.Placeholders + bundle := buildStatementBundle( + ih.origCtx, cfg.DB, ie, &p.curPlan, trace, placeholders, + ) + bundle.insert(ctx, ih.fingerprint, ast, cfg.StmtDiagnosticsRecorder, ih.diagRequestID) + if ih.finishCollectionDiagnostics != nil { + ih.finishCollectionDiagnostics() + telemetry.Inc(sqltelemetry.StatementDiagnosticsCollectedCounter) + } + if ih.withStatementTrace != nil { + ih.withStatementTrace(trace, stmtRawSQL) + } + + // If there was a communication error already, no point in setting any results. + if retErr == nil { + switch ih.outputMode { + case explainAnalyzeDebugOutput: + // Handle EXPLAIN ANALYZE (DEBUG). + retErr = setExplainBundleResult(ctx, res, bundle, cfg) + } + } + + // TODO(radu): this should be unified with other stmt stats accesses. + stmtStats, _ := appStats.getStatsForStmt(ih.fingerprint, ih.implicitTxn, retErr, false) + if stmtStats != nil { + networkBytesSent := int64(0) + for _, flowInfo := range p.curPlan.distSQLFlowInfos { + analyzer := flowInfo.analyzer + if err := analyzer.AddTrace(trace); err != nil { + log.VInfof(ctx, 1, "error analyzing trace statistics for stmt %s: %v", ast, err) + continue + } + + networkBytesSentGroupedByNode, err := analyzer.GetNetworkBytesSent() + if err != nil { + log.VInfof(ctx, 1, "error calculating network bytes sent for stmt %s: %v", ast, err) + continue + } + for _, bytesSentByNode := range networkBytesSentGroupedByNode { + networkBytesSent += bytesSentByNode + } + } + + stmtStats.mu.Lock() + // Record trace-related statistics. A count of 1 is passed given that this + // statistic is only recorded when statement diagnostics are enabled. + // TODO(asubiotto): NumericStat properties will be properly calculated + // once this statistic is always collected. + stmtStats.mu.data.BytesSentOverNetwork.Record(1 /* count */, float64(networkBytesSent)) + stmtStats.mu.Unlock() + } + + return retErr +} + +// SetDiscardRows should be called when we want to discard rows for a +// non-ANALYZE statement (via EXECUTE .. DISCARD ROWS. +func (ih *instrumentationHelper) SetDiscardRows() { + ih.discardRows = true +} + +// ShouldDiscardRows returns true if this is an EXPLAIN ANALYZE variant or +// SetDiscardRows() was called. +func (ih *instrumentationHelper) ShouldDiscardRows() bool { + return ih.discardRows +} + +// ShouldCollectBundle is true if we are collecting a support bundle. +func (ih *instrumentationHelper) ShouldCollectBundle() bool { + return ih.collectBundle +} diff --git a/pkg/sql/planner.go b/pkg/sql/planner.go index c579c625c1d3..55fa9aa7673a 100644 --- a/pkg/sql/planner.go +++ b/pkg/sql/planner.go @@ -140,6 +140,8 @@ type planner struct { // Corresponding Statement for this query. stmt Statement + instrumentation instrumentationHelper + // Contexts for different stages of planning and execution. semaCtx tree.SemaContext extendedEvalCtx extendedEvalContext @@ -175,20 +177,10 @@ type planner struct { // auto-commit. This is dependent on information from the optimizer. autoCommit bool - // discardRows is set if we want to discard any results rather than sending - // them back to the client. Used for testing/benchmarking. Note that the - // resulting schema or the plan are not affected. - // See EXECUTE .. DISCARD ROWS. - discardRows bool - // cancelChecker is used by planNodes to check for cancellation of the associated // query. cancelChecker *cancelchecker.CancelChecker - // collectBundle is set when we are collecting a diagnostics bundle for a - // statement; it triggers saving of extra information like the plan string. - collectBundle bool - // isPreparing is true if this planner is currently preparing. isPreparing bool From 8c274035daf9004e3510f072cac64ceb2fbecb94 Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Thu, 5 Nov 2020 11:24:14 -0800 Subject: [PATCH 2/2] sql: move more logic into instrumentationHelper This commit moves the logic around collecting explain plans (for bundles and for UI) into `instrumentationHelper`. Release note: None --- pkg/sql/conn_executor_exec.go | 12 +-- pkg/sql/executor_statement_metrics.go | 2 +- pkg/sql/explain_bundle.go | 7 +- pkg/sql/explain_tree_test.go | 11 ++- pkg/sql/instrumentation.go | 108 +++++++++++++++++++++----- pkg/sql/plan.go | 54 ++----------- pkg/sql/plan_opt.go | 25 ++---- 7 files changed, 116 insertions(+), 103 deletions(-) diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 8205c32e4d65..89918eadac7d 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -357,7 +357,10 @@ func (ex *connExecutor) execStmtInOpenState( } var needFinish bool - ctx, needFinish = ih.Setup(ctx, ex.server.cfg, p, ex.stmtDiagnosticsRecorder, stmt.AnonymizedStr, os.ImplicitTxn.Get()) + ctx, needFinish = ih.Setup( + ctx, ex.server.cfg, ex.appStats, p, ex.stmtDiagnosticsRecorder, + stmt.AnonymizedStr, os.ImplicitTxn.Get(), + ) if needFinish { sql := stmt.SQL defer func() { @@ -863,13 +866,6 @@ func (ex *connExecutor) dispatchToExecutionEngine( // makeExecPlan creates an execution plan and populates planner.curPlan using // the cost-based optimizer. func (ex *connExecutor) makeExecPlan(ctx context.Context, planner *planner) error { - savePlanString := planner.instrumentation.ShouldCollectBundle() - planner.curPlan.init( - &planner.stmt, - ex.appStats, - savePlanString, - ) - if err := planner.makeOptimizerPlan(ctx); err != nil { log.VEventf(ctx, 1, "optimizer plan failed: %v", err) return err diff --git a/pkg/sql/executor_statement_metrics.go b/pkg/sql/executor_statement_metrics.go index 0f0961157b59..a3084a9944a9 100644 --- a/pkg/sql/executor_statement_metrics.go +++ b/pkg/sql/executor_statement_metrics.go @@ -196,7 +196,7 @@ func (ex *connExecutor) recordStatementSummary( } stmtID := ex.statsCollector.recordStatement( - stmt, planner.curPlan.planForStats, + stmt, planner.instrumentation.PlanForStats(ctx), flags.IsDistributed(), flags.IsSet(planFlagVectorized), flags.IsSet(planFlagImplicitTxn), automaticRetryCount, rowsAffected, err, parseLat, planLat, runLat, svcLat, execOverhead, stats, diff --git a/pkg/sql/explain_bundle.go b/pkg/sql/explain_bundle.go index a1ad18a4f3e4..834be0a235dc 100644 --- a/pkg/sql/explain_bundle.go +++ b/pkg/sql/explain_bundle.go @@ -148,6 +148,7 @@ func buildStatementBundle( db *kv.DB, ie *InternalExecutor, plan *planTop, + planString string, trace tracing.Recording, placeholders *tree.PlaceholderInfo, ) diagnosticsBundle { @@ -158,7 +159,7 @@ func buildStatementBundle( b.addStatement() b.addOptPlans() - b.addExecPlan() + b.addExecPlan(planString) // TODO(yuzefovich): consider adding some variant of EXPLAIN (VEC) output // of the query to the bundle. b.addDistSQLDiagrams() @@ -266,8 +267,8 @@ func (b *stmtBundleBuilder) addOptPlans() { } // addExecPlan adds the EXPLAIN (VERBOSE) plan as file plan.txt. -func (b *stmtBundleBuilder) addExecPlan() { - if plan := b.plan.planString; plan != "" { +func (b *stmtBundleBuilder) addExecPlan(plan string) { + if plan != "" { b.z.AddFile("plan.txt", plan) } } diff --git a/pkg/sql/explain_tree_test.go b/pkg/sql/explain_tree_test.go index 9d0bf717fda9..b25cfd6c8ac9 100644 --- a/pkg/sql/explain_tree_test.go +++ b/pkg/sql/explain_tree_test.go @@ -66,18 +66,21 @@ func TestPlanToTreeAndPlanToString(t *testing.T) { defer cleanup() p := internalPlanner.(*planner) + ih := &p.instrumentation + ih.codec = execCfg.Codec + ih.collectBundle = true + ih.savePlanForStats = true + p.stmt = makeStatement(stmt, ClusterWideID{}) - p.curPlan.savePlanString = true - p.curPlan.savePlanForStats = true if err := p.makeOptimizerPlan(ctx); err != nil { t.Fatal(err) } p.curPlan.flags.Set(planFlagExecDone) p.curPlan.close(ctx) if d.Cmd == "plan-string" { - return p.curPlan.planString + return ih.planString() } - treeYaml, err := yaml.Marshal(p.curPlan.planForStats) + treeYaml, err := yaml.Marshal(ih.PlanForStats(ctx)) if err != nil { t.Fatal(err) } diff --git a/pkg/sql/instrumentation.go b/pkg/sql/instrumentation.go index 96123a4d6dbe..84b237f93eb6 100644 --- a/pkg/sql/instrumentation.go +++ b/pkg/sql/instrumentation.go @@ -12,8 +12,13 @@ package sql import ( "context" + "fmt" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server/telemetry" + "github.com/cockroachdb/cockroach/pkg/sql/opt/exec/explain" + "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" "github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics" @@ -29,8 +34,9 @@ import ( // // - Setup() is called before query execution. // -// - SetDiscardRows(), ShouldDiscardRows(), ShouldCollectBundle() can be called -// at any point during execution. +// - SetDiscardRows(), ShouldDiscardRows(), ShouldCollectBundle(), +// ShouldBuildExplainPlan(), RecordExplainPlan(), RecordPlanInfo(), +// PlanForStats() can be called at any point during execution. // // - Finish() is called after query execution. // @@ -40,6 +46,7 @@ type instrumentationHelper struct { // Query fingerprint (anonymized statement). fingerprint string implicitTxn bool + codec keys.SQLCodec // -- The following fields are initialized by Setup() -- @@ -59,6 +66,14 @@ type instrumentationHelper struct { sp *tracing.Span origCtx context.Context + + // If savePlanForStats is true, the explainPlan will be collected and returned + // via PlanForStats(). + savePlanForStats bool + + explainPlan *explain.Plan + distribution physicalplan.PlanDistribution + vectorized bool } // outputMode indicates how the statement output needs to be populated (for @@ -83,6 +98,7 @@ func (ih *instrumentationHelper) SetOutputMode(outputMode outputMode) { func (ih *instrumentationHelper) Setup( ctx context.Context, cfg *ExecutorConfig, + appStats *appStats, p *planner, stmtDiagnosticsRecorder *stmtdiagnostics.Registry, fingerprint string, @@ -90,6 +106,7 @@ func (ih *instrumentationHelper) Setup( ) (newCtx context.Context, needFinish bool) { ih.fingerprint = fingerprint ih.implicitTxn = implicitTxn + ih.codec = cfg.Codec if ih.outputMode == explainAnalyzeDebugOutput { ih.collectBundle = true @@ -105,7 +122,7 @@ func (ih *instrumentationHelper) Setup( ih.withStatementTrace = cfg.TestingKnobs.WithStatementTrace - // TODO(radu): logic around saving plans for stats should be here. + ih.savePlanForStats = appStats.shouldSaveLogicalPlanDescription(fingerprint, implicitTxn) if !ih.collectBundle && ih.withStatementTrace == nil { return ctx, false @@ -137,26 +154,25 @@ func (ih *instrumentationHelper) Finish( trace := ih.sp.GetRecording() ie := p.extendedEvalCtx.InternalExecutor.(*InternalExecutor) placeholders := p.extendedEvalCtx.Placeholders - bundle := buildStatementBundle( - ih.origCtx, cfg.DB, ie, &p.curPlan, trace, placeholders, - ) - bundle.insert(ctx, ih.fingerprint, ast, cfg.StmtDiagnosticsRecorder, ih.diagRequestID) - if ih.finishCollectionDiagnostics != nil { - ih.finishCollectionDiagnostics() - telemetry.Inc(sqltelemetry.StatementDiagnosticsCollectedCounter) - } - if ih.withStatementTrace != nil { - ih.withStatementTrace(trace, stmtRawSQL) - } + if ih.collectBundle { + bundle := buildStatementBundle( + ih.origCtx, cfg.DB, ie, &p.curPlan, ih.planString(), trace, placeholders, + ) + bundle.insert(ctx, ih.fingerprint, ast, cfg.StmtDiagnosticsRecorder, ih.diagRequestID) + if ih.finishCollectionDiagnostics != nil { + ih.finishCollectionDiagnostics() + telemetry.Inc(sqltelemetry.StatementDiagnosticsCollectedCounter) + } - // If there was a communication error already, no point in setting any results. - if retErr == nil { - switch ih.outputMode { - case explainAnalyzeDebugOutput: - // Handle EXPLAIN ANALYZE (DEBUG). + // Handle EXPLAIN ANALYZE (DEBUG). If there was a communication error + // already, no point in setting any results. + if ih.outputMode == explainAnalyzeDebugOutput && retErr == nil { retErr = setExplainBundleResult(ctx, res, bundle, cfg) } } + if ih.withStatementTrace != nil { + ih.withStatementTrace(trace, stmtRawSQL) + } // TODO(radu): this should be unified with other stmt stats accesses. stmtStats, _ := appStats.getStatsForStmt(ih.fingerprint, ih.implicitTxn, retErr, false) @@ -192,7 +208,7 @@ func (ih *instrumentationHelper) Finish( } // SetDiscardRows should be called when we want to discard rows for a -// non-ANALYZE statement (via EXECUTE .. DISCARD ROWS. +// non-ANALYZE statement (via EXECUTE .. DISCARD ROWS). func (ih *instrumentationHelper) SetDiscardRows() { ih.discardRows = true } @@ -207,3 +223,55 @@ func (ih *instrumentationHelper) ShouldDiscardRows() bool { func (ih *instrumentationHelper) ShouldCollectBundle() bool { return ih.collectBundle } + +// ShouldBuildExplainPlan returns true if we should build an explain plan and +// call RecordExplainPlan. +func (ih *instrumentationHelper) ShouldBuildExplainPlan() bool { + return ih.collectBundle || ih.savePlanForStats +} + +// RecordExplainPlan records the explain.Plan for this query. +func (ih *instrumentationHelper) RecordExplainPlan(explainPlan *explain.Plan) { + ih.explainPlan = explainPlan +} + +// RecordPlanInfo records top-level information about the plan. +func (ih *instrumentationHelper) RecordPlanInfo( + distribution physicalplan.PlanDistribution, vectorized bool, +) { + ih.distribution = distribution + ih.vectorized = vectorized +} + +// PlanForStats returns the plan as an ExplainTreePlanNode tree, if it was +// collected (nil otherwise). It should be called after RecordExplainPlan() and +// RecordPlanInfo(). +func (ih *instrumentationHelper) PlanForStats(ctx context.Context) *roachpb.ExplainTreePlanNode { + if ih.explainPlan == nil { + return nil + } + + ob := explain.NewOutputBuilder(explain.Flags{ + HideValues: true, + }) + if err := emitExplain(ob, ih.codec, ih.explainPlan, ih.distribution, ih.vectorized); err != nil { + log.Warningf(ctx, "unable to emit explain plan tree: %v", err) + return nil + } + return ob.BuildProtoTree() +} + +// planString generates the plan tree as a string; used internally for bundles. +func (ih *instrumentationHelper) planString() string { + if ih.explainPlan == nil { + return "" + } + ob := explain.NewOutputBuilder(explain.Flags{ + Verbose: true, + ShowTypes: true, + }) + if err := emitExplain(ob, ih.codec, ih.explainPlan, ih.distribution, ih.vectorized); err != nil { + return fmt.Sprintf("error emitting plan: %v", err) + } + return ob.BuildString() +} diff --git a/pkg/sql/plan.go b/pkg/sql/plan.go index 73a435342734..2c54453448bf 100644 --- a/pkg/sql/plan.go +++ b/pkg/sql/plan.go @@ -12,22 +12,17 @@ package sql import ( "context" - "fmt" - "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" - "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/execstats" "github.com/cockroachdb/cockroach/pkg/sql/opt/exec" - "github.com/cockroachdb/cockroach/pkg/sql/opt/exec/explain" "github.com/cockroachdb/cockroach/pkg/sql/opt/memo" "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" - "github.com/cockroachdb/cockroach/pkg/util/log" ) // runParams is a struct containing all parameters passed to planNode.Next() and @@ -294,9 +289,6 @@ type planTop struct { mem *memo.Memo catalog *optCatalog - // codec is populated during planning. - codec keys.SQLCodec - // auditEvents becomes non-nil if any of the descriptors used by // current statement is causing an auditing event. See exec_log.go. auditEvents []auditEvent @@ -315,18 +307,7 @@ type planTop struct { // diagrams, are saved here. distSQLFlowInfos []flowInfo - // If savePlanForStats is true, an ExplainTreePlanNode tree will be saved in - // planForStats when the plan is closed. - savePlanForStats bool - // appStats is used to populate savePlanForStats. - appStats *appStats - planForStats *roachpb.ExplainTreePlanNode - - // If savePlanString is set to true, an EXPLAIN (VERBOSE)-style plan string - // will be saved in planString when the plan is closed. - savePlanString bool - planString string - explainPlan *explain.Plan + instrumentation *instrumentationHelper } // physicalPlanTop is a utility wrapper around PhysicalPlan that allows for @@ -459,17 +440,16 @@ func (p *planComponents) close(ctx context.Context) { // init resets planTop to point to a given statement; used at the start of the // planning process. -func (p *planTop) init(stmt *Statement, appStats *appStats, savePlanString bool) { +func (p *planTop) init(stmt *Statement, instrumentation *instrumentationHelper) { *p = planTop{ - stmt: stmt, - appStats: appStats, - savePlanString: savePlanString, + stmt: stmt, + instrumentation: instrumentation, } } // close ensures that the plan's resources have been deallocated. func (p *planTop) close(ctx context.Context) { - if p.explainPlan != nil && p.flags.IsSet(planFlagExecDone) { + if p.flags.IsSet(planFlagExecDone) { p.savePlanInfo(ctx) } p.planComponents.close(ctx) @@ -484,29 +464,7 @@ func (p *planTop) savePlanInfo(ctx context.Context) { } else if p.flags.IsSet(planFlagPartiallyDistributed) { distribution = physicalplan.PartiallyDistributedPlan } - - if p.savePlanForStats { - ob := explain.NewOutputBuilder(explain.Flags{ - HideValues: true, - }) - if err := emitExplain(ob, p.codec, p.explainPlan, distribution, vectorized); err != nil { - log.Warningf(ctx, "unable to emit explain plan tree: %v", err) - } else { - p.planForStats = ob.BuildProtoTree() - } - } - - if p.savePlanString { - ob := explain.NewOutputBuilder(explain.Flags{ - Verbose: true, - ShowTypes: true, - }) - if err := emitExplain(ob, p.codec, p.explainPlan, distribution, vectorized); err != nil { - p.planString = fmt.Sprintf("error emitting plan: %v", err) - } else { - p.planString = ob.BuildString() - } - } + p.instrumentation.RecordPlanInfo(distribution, vectorized) } // formatOptPlan returns a visual representation of the optimizer plan that was diff --git a/pkg/sql/plan_opt.go b/pkg/sql/plan_opt.go index b26a527039bf..c76e6a421af4 100644 --- a/pkg/sql/plan_opt.go +++ b/pkg/sql/plan_opt.go @@ -14,7 +14,6 @@ import ( "context" "strings" - "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" @@ -186,6 +185,8 @@ func (p *planner) prepareUsingOptimizer(ctx context.Context) (planFlags, error) // makeOptimizerPlan generates a plan using the cost-based optimizer. // On success, it populates p.curPlan. func (p *planner) makeOptimizerPlan(ctx context.Context) error { + p.curPlan.init(&p.stmt, &p.instrumentation) + opc := &p.optPlanningCtx opc.reset() @@ -209,7 +210,6 @@ func (p *planner) makeOptimizerPlan(ctx context.Context) error { newDistSQLSpecExecFactory(p, planningMode), execMemo, p.EvalContext(), - p.ExecCfg().Codec, p.autoCommit, ) if err != nil { @@ -244,7 +244,6 @@ func (p *planner) makeOptimizerPlan(ctx context.Context) error { newDistSQLSpecExecFactory(p, distSQLLocalOnlyPlanning), execMemo, p.EvalContext(), - p.ExecCfg().Codec, p.autoCommit, ) } @@ -265,7 +264,6 @@ func (p *planner) makeOptimizerPlan(ctx context.Context) error { newExecFactory(p), execMemo, p.EvalContext(), - p.ExecCfg().Codec, p.autoCommit, ) } @@ -543,24 +541,13 @@ func (opc *optPlanningCtx) runExecBuilder( f exec.Factory, mem *memo.Memo, evalCtx *tree.EvalContext, - codec keys.SQLCodec, allowAutoCommit bool, ) error { var result *planComponents - var explainPlan *explain.Plan var isDDL bool var containsFullTableScan bool var containsFullIndexScan bool - if planTop.appStats != nil { - // We do not set this flag upfront when initializing planTop because the - // planning process could in principle modify the AST, resulting in a - // different statement signature. - planTop.savePlanForStats = planTop.appStats.shouldSaveLogicalPlanDescription( - planTop.stmt.AnonymizedStr, - allowAutoCommit, - ) - } - if !planTop.savePlanString && !planTop.savePlanForStats { + if !planTop.instrumentation.ShouldBuildExplainPlan() { // No instrumentation. bld := execbuilder.New(f, mem, &opc.catalog, mem.RootExpr(), evalCtx, allowAutoCommit) plan, err := bld.Build() @@ -579,11 +566,13 @@ func (opc *optPlanningCtx) runExecBuilder( if err != nil { return err } - explainPlan = plan.(*explain.Plan) + explainPlan := plan.(*explain.Plan) result = explainPlan.WrappedPlan.(*planComponents) isDDL = bld.IsDDL containsFullTableScan = bld.ContainsFullTableScan containsFullIndexScan = bld.ContainsFullIndexScan + + planTop.instrumentation.RecordExplainPlan(explainPlan) } if stmt.ExpectedTypes != nil { @@ -594,10 +583,8 @@ func (opc *optPlanningCtx) runExecBuilder( } planTop.planComponents = *result - planTop.explainPlan = explainPlan planTop.mem = mem planTop.catalog = &opc.catalog - planTop.codec = codec planTop.stmt = stmt planTop.flags = opc.flags if isDDL {