From 6cdaedac43a8f82fb3a89814db8e3196eae208da Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Fri, 12 Feb 2021 18:27:38 -0500 Subject: [PATCH] sql: add nodes for each EXPLAIN ANALYZE operator MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Show the cluster nodes involved in the execution of each operator. Note that this info does not show up in the non-analyze EXPLAIN. It is technically much more challenging to do that because of the indirect way we do distsql planning. Once we have the new DistSQL exec factory, we will be able to add it. Note on the implementation: I started by trying to make execution set `ComponentID.NodeID` in all cases, but I got stuck in `ProcessorBase` where we only have a `SQLIDContainer` available. I don't fully understand the new abstraction and whether the distsql components and flows should really use SQLIDs instead of NodeIDs. Unfortunately, there is not much we can do to test this currently (other than manual testing). I will investigate making the "deterministic" flag more fine-grained, so that we can hide truly non-deterministic values (like timings) separately from those that just vary with the configuration (query distribution). Example: ``` movr> EXPLAIN ANALYZE SELECT * FROM rides JOIN vehicles ON rides.vehicle_id = vehicles.id; info -------------------------------------------- planning time: 158µs execution time: 7ms distribution: full vectorized: true hash join │ cluster nodes: n1, n2, n3 │ actual row count: 500 │ equality: (vehicle_id) = (id) │ ├── scan │ cluster nodes: n1, n2, n3 │ actual row count: 500 │ KV rows read: 500 │ KV bytes read: 86 KiB │ missing stats │ table: rides@primary │ spans: FULL SCAN │ └── scan cluster nodes: n1, n2, n3 actual row count: 15 KV rows read: 15 KV bytes read: 2.3 KiB missing stats table: vehicles@primary spans: FULL SCAN ``` Release note (sql change): EXPLAIN ANALYZE now includes the nodes which were involved in the execution of each operator in the tree. --- pkg/sql/distsql_physical_planner.go | 7 ++++-- pkg/sql/instrumentation.go | 24 +++++++++++++++---- .../testdata/logic_test/dist_vectorize | 5 ++++ .../testdata/logic_test/explain_analyze | 5 ++++ .../testdata/logic_test/explain_analyze_plans | 24 +++++++++++++++++++ .../logic_test/inverted_index_geospatial | 15 ++++++++++++ .../testdata/logic_test/vectorize_local | 6 +++++ pkg/sql/opt/exec/explain/emit.go | 3 +++ pkg/sql/opt/exec/explain/output.go | 9 +++++++ pkg/sql/opt/exec/factory.go | 3 +++ 10 files changed, 95 insertions(+), 6 deletions(-) diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 83b1b8fc75e8..540b1e8101f5 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -2724,11 +2724,14 @@ func (dsp *DistSQLPlanner) createPhysPlanForPlanNode( if planCtx.traceMetadata != nil { processors := make(execComponents, len(plan.ResultRouters)) - for i := range plan.ResultRouters { + for i, resultProcIdx := range plan.ResultRouters { processors[i] = execinfrapb.ProcessorComponentID( execinfrapb.FlowID{UUID: planCtx.infra.FlowID}, - int32(plan.ResultRouters[i]), + int32(resultProcIdx), ) + // TODO(radu): we should set NodeID in all cases, as part of + // ProcessorComponentID. + processors[i].NodeID = plan.Processors[resultProcIdx].Node } planCtx.traceMetadata.associateNodeWithComponents(node, processors) } diff --git a/pkg/sql/instrumentation.go b/pkg/sql/instrumentation.go index 2c278d98cc50..57b1a6841b7b 100644 --- a/pkg/sql/instrumentation.go +++ b/pkg/sql/instrumentation.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" "github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" @@ -219,7 +220,12 @@ func (ih *instrumentationHelper) Finish( } if ih.traceMetadata != nil && ih.explainPlan != nil { - ih.traceMetadata.annotateExplain(ih.explainPlan, trace, cfg.TestingKnobs.DeterministicExplainAnalyze) + ih.traceMetadata.annotateExplain( + ih.explainPlan, + p.curPlan.distSQLFlowInfos, + trace, + cfg.TestingKnobs.DeterministicExplainAnalyze, + ) } // TODO(radu): this should be unified with other stmt stats accesses. @@ -469,7 +475,7 @@ func (m execNodeTraceMetadata) associateNodeWithComponents( // annotateExplain aggregates the statistics in the trace and annotates // explain.Nodes with execution stats. func (m execNodeTraceMetadata) annotateExplain( - plan *explain.Plan, spans []tracingpb.RecordedSpan, makeDeterministic bool, + plan *explain.Plan, flowInfos []flowInfo, spans []tracingpb.RecordedSpan, makeDeterministic bool, ) { statsMap := execinfrapb.ExtractStatsFromSpans(spans, makeDeterministic) @@ -480,8 +486,15 @@ func (m execNodeTraceMetadata) annotateExplain( var nodeStats exec.ExecutionStats incomplete := false - for i := range components { - stats := statsMap[components[i]] + var nodes util.FastIntSet + for _, c := range components { + if c.Type == execinfrapb.ComponentID_PROCESSOR { + nodes.Add(int(c.NodeID)) + // Clear the NodeID, which is not set for processors. + // TODO(radu): remove this hack when execution sets NodeID for all components. + c.NodeID = 0 + } + stats := statsMap[c] if stats == nil { incomplete = true break @@ -494,6 +507,9 @@ func (m execNodeTraceMetadata) annotateExplain( // incomplete results. In the future, we may consider an incomplete flag // if we want to show them with a warning. if !incomplete { + for i, ok := nodes.Next(0); ok; i, ok = nodes.Next(i + 1) { + nodeStats.Nodes = append(nodeStats.Nodes, fmt.Sprintf("n%d", i)) + } n.Annotate(exec.ExecutionStatsID, &nodeStats) } } diff --git a/pkg/sql/logictest/testdata/logic_test/dist_vectorize b/pkg/sql/logictest/testdata/logic_test/dist_vectorize index ce2244803a9b..9aebf8aa4b79 100644 --- a/pkg/sql/logictest/testdata/logic_test/dist_vectorize +++ b/pkg/sql/logictest/testdata/logic_test/dist_vectorize @@ -57,9 +57,11 @@ distribution: vectorized: · • group (scalar) +│ cluster nodes: │ actual row count: 1 │ └── • scan + cluster nodes: actual row count: 5 KV rows read: 5 KV bytes read: 40 B @@ -80,12 +82,14 @@ distribution: vectorized: · • merge join +│ cluster nodes: │ actual row count: 5 │ equality: (k) = (k) │ left cols are key │ right cols are key │ ├── • scan +│ cluster nodes: │ actual row count: 5 │ KV rows read: 5 │ KV bytes read: 40 B @@ -94,6 +98,7 @@ vectorized: │ spans: FULL SCAN │ └── • scan + cluster nodes: actual row count: 5 KV rows read: 5 KV bytes read: 40 B diff --git a/pkg/sql/logictest/testdata/logic_test/explain_analyze b/pkg/sql/logictest/testdata/logic_test/explain_analyze index e7275d1aecd3..2c3218f7f6ae 100644 --- a/pkg/sql/logictest/testdata/logic_test/explain_analyze +++ b/pkg/sql/logictest/testdata/logic_test/explain_analyze @@ -15,6 +15,7 @@ distribution: vectorized: · • scan + cluster nodes: actual row count: 0 KV rows read: 0 KV bytes read: 0 B @@ -36,6 +37,7 @@ distribution: vectorized: · • scan + cluster nodes: actual row count: 3 KV rows read: 3 KV bytes read: 24 B @@ -59,6 +61,7 @@ vectorized: · • hash join (inner) │ columns: (k, v, a, b) +│ cluster nodes: │ actual row count: 2 │ estimated row count: 990 (missing stats) │ equality: (v) = (a) @@ -66,6 +69,7 @@ vectorized: │ ├── • scan │ columns: (k, v) +│ cluster nodes: │ actual row count: 4 │ KV rows read: 4 │ KV bytes read: 32 B @@ -75,6 +79,7 @@ vectorized: │ └── • scan columns: (a, b) + cluster nodes: actual row count: 3 KV rows read: 3 KV bytes read: 24 B diff --git a/pkg/sql/logictest/testdata/logic_test/explain_analyze_plans b/pkg/sql/logictest/testdata/logic_test/explain_analyze_plans index b937a454fa79..8bd1be1c73f2 100644 --- a/pkg/sql/logictest/testdata/logic_test/explain_analyze_plans +++ b/pkg/sql/logictest/testdata/logic_test/explain_analyze_plans @@ -66,17 +66,20 @@ distribution: vectorized: · • group +│ cluster nodes: │ actual row count: 5 │ group by: k │ ordered: +k │ └── • merge join + │ cluster nodes: │ actual row count: 5 │ equality: (k) = (k) │ left cols are key │ right cols are key │ ├── • scan + │ cluster nodes: │ actual row count: 5 │ KV rows read: 5 │ KV bytes read: 40 B @@ -85,6 +88,7 @@ vectorized: │ spans: FULL SCAN │ └── • scan + cluster nodes: actual row count: 5 KV rows read: 5 KV bytes read: 40 B @@ -106,19 +110,23 @@ distribution: vectorized: · • sort +│ cluster nodes: │ actual row count: 5 │ order: +w │ └── • distinct + │ cluster nodes: │ actual row count: 5 │ distinct on: w │ └── • hash join + │ cluster nodes: │ actual row count: 5 │ equality: (k) = (w) │ left cols are key │ ├── • scan + │ cluster nodes: │ actual row count: 5 │ KV rows read: 5 │ KV bytes read: 40 B @@ -127,6 +135,7 @@ vectorized: │ spans: FULL SCAN │ └── • scan + cluster nodes: actual row count: 5 KV rows read: 5 KV bytes read: 40 B @@ -148,12 +157,15 @@ distribution: vectorized: · • cross join +│ cluster nodes: │ actual row count: 25 │ ├── • ordinality +│ │ cluster nodes: │ │ actual row count: 5 │ │ │ └── • scan +│ cluster nodes: │ actual row count: 5 │ KV rows read: 5 │ KV bytes read: 40 B @@ -162,9 +174,11 @@ vectorized: │ spans: FULL SCAN │ └── • ordinality + │ cluster nodes: │ actual row count: 5 │ └── • scan + cluster nodes: actual row count: 5 KV rows read: 5 KV bytes read: 40 B @@ -206,9 +220,11 @@ distribution: vectorized: · • window +│ cluster nodes: │ actual row count: 5 │ └── • scan + cluster nodes: actual row count: 5 KV rows read: 5 KV bytes read: 40 B @@ -231,6 +247,7 @@ distribution: vectorized: · • scan + cluster nodes: actual row count: 0 KV rows read: 0 KV bytes read: 0 B @@ -259,6 +276,7 @@ vectorized: • root │ ├── • insert +│ │ cluster nodes: │ │ actual row count: 1 │ │ into: child(c, p) │ │ @@ -274,9 +292,11 @@ vectorized: │ │ exec mode: one row │ │ │ └── • group (scalar) +│ │ cluster nodes: │ │ actual row count: 1 │ │ │ └── • scan +│ cluster nodes: │ actual row count: 1 │ KV rows read: 1 │ KV bytes read: 8 B @@ -288,9 +308,11 @@ vectorized: └── • constraint-check │ └── • error if rows + │ cluster nodes: │ actual row count: 0 │ └── • lookup join (anti) + │ cluster nodes: │ actual row count: 0 │ KV rows read: 1 │ KV bytes read: 8 B @@ -299,10 +321,12 @@ vectorized: │ equality cols are key │ └── • filter + │ cluster nodes: │ actual row count: 1 │ filter: column2 IS NOT NULL │ └── • scan buffer + cluster nodes: actual row count: 1 label: buffer 1 · diff --git a/pkg/sql/logictest/testdata/logic_test/inverted_index_geospatial b/pkg/sql/logictest/testdata/logic_test/inverted_index_geospatial index 1cfe1f353c69..21fabaa3b6a4 100644 --- a/pkg/sql/logictest/testdata/logic_test/inverted_index_geospatial +++ b/pkg/sql/logictest/testdata/logic_test/inverted_index_geospatial @@ -34,25 +34,30 @@ distribution: vectorized: · • sort +│ cluster nodes: │ actual row count: 2 │ order: +k │ └── • filter + │ cluster nodes: │ actual row count: 2 │ filter: st_intersects('010100002026690000000000000C6A18410000008081844E41', geom) │ └── • index join + │ cluster nodes: │ actual row count: 2 │ KV rows read: 2 │ KV bytes read: 16 B │ table: geo_table@primary │ └── • inverted filter + │ cluster nodes: │ actual row count: 2 │ inverted column: geom_inverted_key │ num spans: 31 │ └── • scan + cluster nodes: actual row count: 2 KV rows read: 4 KV bytes read: 32 B @@ -104,25 +109,30 @@ distribution: vectorized: · • sort +│ cluster nodes: │ actual row count: 2 │ order: +k │ └── • filter + │ cluster nodes: │ actual row count: 2 │ filter: st_intersects('010100002026690000000000000C6A18410000008081844E41', geom) │ └── • index join + │ cluster nodes: │ actual row count: 2 │ KV rows read: 2 │ KV bytes read: 16 B │ table: geo_table@primary │ └── • inverted filter + │ cluster nodes: │ actual row count: 2 │ inverted column: geom_inverted_key │ num spans: 31 │ └── • scan + cluster nodes: actual row count: 2 KV rows read: 2 KV bytes read: 16 B @@ -150,25 +160,30 @@ distribution: vectorized: · • sort +│ cluster nodes: │ actual row count: 2 │ order: +k │ └── • filter + │ cluster nodes: │ actual row count: 2 │ filter: st_intersects('010100002026690000000000000C6A18410000008081844E41', geom) │ └── • index join + │ cluster nodes: │ actual row count: 2 │ KV rows read: 2 │ KV bytes read: 16 B │ table: geo_table@primary │ └── • inverted filter + │ cluster nodes: │ actual row count: 2 │ inverted column: geom_inverted_key │ num spans: 31 │ └── • scan + cluster nodes: actual row count: 2 KV rows read: 2 KV bytes read: 16 B diff --git a/pkg/sql/logictest/testdata/logic_test/vectorize_local b/pkg/sql/logictest/testdata/logic_test/vectorize_local index a2e42992156f..c83f3026a0bf 100644 --- a/pkg/sql/logictest/testdata/logic_test/vectorize_local +++ b/pkg/sql/logictest/testdata/logic_test/vectorize_local @@ -44,6 +44,7 @@ distribution: vectorized: · • scan + cluster nodes: actual row count: 2,001 KV rows read: 2,001 KV bytes read: 16 KiB @@ -64,6 +65,7 @@ distribution: vectorized: · • lookup join +│ cluster nodes: │ actual row count: 2 │ KV rows read: 1 │ KV bytes read: 8 B @@ -71,6 +73,7 @@ vectorized: │ equality: (b) = (b) │ └── • scan + cluster nodes: actual row count: 2 KV rows read: 2 KV bytes read: 16 B @@ -91,10 +94,12 @@ distribution: vectorized: · • merge join +│ cluster nodes: │ actual row count: 2 │ equality: (a) = (b) │ ├── • scan +│ cluster nodes: │ actual row count: 2 │ KV rows read: 2 │ KV bytes read: 16 B @@ -103,6 +108,7 @@ vectorized: │ spans: FULL SCAN │ └── • scan + cluster nodes: actual row count: 2 KV rows read: 2 KV bytes read: 16 B diff --git a/pkg/sql/opt/exec/explain/emit.go b/pkg/sql/opt/exec/explain/emit.go index 9596e2ee4d00..bd44a7d2fa1f 100644 --- a/pkg/sql/opt/exec/explain/emit.go +++ b/pkg/sql/opt/exec/explain/emit.go @@ -331,6 +331,9 @@ func (e *emitter) joinNodeName(algo string, joinType descpb.JoinType) string { func (e *emitter) emitNodeAttributes(n *Node) error { if stats, ok := n.annotations[exec.ExecutionStatsID]; ok { s := stats.(*exec.ExecutionStats) + if len(s.Nodes) > 0 { + e.ob.AddNonDeterministicField("cluster nodes", strings.Join(s.Nodes, ", ")) + } if s.RowCount.HasValue() { e.ob.AddField("actual row count", humanizeutil.Count(s.RowCount.Value())) } diff --git a/pkg/sql/opt/exec/explain/output.go b/pkg/sql/opt/exec/explain/output.go index 86a2c406f2f3..4a13417b9cdb 100644 --- a/pkg/sql/opt/exec/explain/output.go +++ b/pkg/sql/opt/exec/explain/output.go @@ -105,6 +105,15 @@ func (ob *OutputBuilder) AddField(key, value string) { ob.entries = append(ob.entries, entry{field: key, fieldVal: value}) } +// AddNonDeterministicField adds an information field under the current node, +// but hides the information if the MakeDeterministic flag is set. +func (ob *OutputBuilder) AddNonDeterministicField(key, value string) { + if ob.flags.MakeDeterministic { + value = "" + } + ob.AddField(key, value) +} + // Attr adds an information field under the current node. func (ob *OutputBuilder) Attr(key string, value interface{}) { ob.AddField(key, fmt.Sprint(value)) diff --git a/pkg/sql/opt/exec/factory.go b/pkg/sql/opt/exec/factory.go index d18179869a62..9be03805d27c 100644 --- a/pkg/sql/opt/exec/factory.go +++ b/pkg/sql/opt/exec/factory.go @@ -304,6 +304,9 @@ type ExecutionStats struct { KVBytesRead optional.Uint KVRowsRead optional.Uint + + // Nodes on which this operator was executed. + Nodes []string } // BuildPlanForExplainFn builds an execution plan against the given