Skip to content

Commit

Permalink
sql: show distribution info based on actual physical plan in EXPLAIN
Browse files Browse the repository at this point in the history
Previously, the `distribution` info in `EXPLAIN` output was printed
based on the recommendation about the distribution of the plan. For
example, if the plan is determined as "should be distributed", yet
it only contains a single flow on the gateway, we would say that the
plan has "full" distribution.

This commit updates the code to print the distribution based on the
actual physical plan (in the example above it would say "local"),
regardless of the reason - whether it is the recommendation to plan
locally or the data happened to be only on the gateway.

I think it makes more sense this way since now DISTSQL diagram
consisting of a single flow on the gateway more appropriately
corresponds to "local" distribution. Additionally, this change is
motivated by the follow-up commit which will introduce changes to the
physical plan during the plan finalization, and we want to show the
correct distribution in the EXPLAIN output for that too.

Release note: None
  • Loading branch information
yuzefovich committed Oct 4, 2021
1 parent ebca030 commit 4cc510b
Show file tree
Hide file tree
Showing 24 changed files with 207 additions and 156 deletions.
30 changes: 15 additions & 15 deletions pkg/ccl/logictestccl/testdata/logic_test/zone
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ ALTER INDEX t@secondary CONFIGURE ZONE USING constraints='[+region=test,+dc=dc1]
query T retry
EXPLAIN SELECT * FROM t WHERE k=10
----
distribution: full
distribution: local
vectorized: true
·
• scan
Expand Down Expand Up @@ -74,7 +74,7 @@ ALTER INDEX t@tertiary CONFIGURE ZONE USING constraints='[+region=test,+dc=dc1]'
query T retry
EXPLAIN SELECT * FROM t WHERE k=10
----
distribution: full
distribution: local
vectorized: true
·
• scan
Expand Down Expand Up @@ -121,7 +121,7 @@ ALTER INDEX t@tertiary CONFIGURE ZONE USING constraints='[+region=test,+dc=dc3]'
query T retry
EXPLAIN SELECT * FROM t WHERE k=10
----
distribution: full
distribution: local
vectorized: true
·
• scan
Expand All @@ -143,7 +143,7 @@ ALTER INDEX t@secondary CONFIGURE ZONE USING constraints='[+region=test,+dc=dc2]
query T retry
EXPLAIN SELECT * FROM t WHERE k=10
----
distribution: full
distribution: local
vectorized: true
·
• scan
Expand Down Expand Up @@ -187,7 +187,7 @@ PREPARE p AS SELECT * FROM [EXPLAIN SELECT k, v FROM t WHERE k=10]
query T retry
EXECUTE p
----
distribution: full
distribution: local
vectorized: true
·
• scan
Expand All @@ -204,7 +204,7 @@ ALTER INDEX t@secondary CONFIGURE ZONE USING constraints='[+region=test,+dc=dc1]
query T retry
EXECUTE p
----
distribution: full
distribution: local
vectorized: true
·
• scan
Expand All @@ -231,7 +231,7 @@ USING constraints='[+region=test]', lease_preferences='[[+region=test,+dc=dc1]]'
query T retry
EXPLAIN SELECT * FROM t WHERE k=10
----
distribution: full
distribution: local
vectorized: true
·
• scan
Expand All @@ -255,7 +255,7 @@ USING constraints='[+region=test]', lease_preferences='[[+region=test,+dc=dc1]]'
query T retry
EXPLAIN SELECT * FROM t WHERE k=10
----
distribution: full
distribution: local
vectorized: true
·
• scan
Expand Down Expand Up @@ -312,7 +312,7 @@ USING constraints='[+region=test]', lease_preferences='[[+region=test,+dc=dc1]]'
query T retry
EXPLAIN SELECT * FROM t WHERE k=10
----
distribution: full
distribution: local
vectorized: true
·
• scan
Expand Down Expand Up @@ -359,7 +359,7 @@ PREPARE p AS SELECT * FROM [EXPLAIN SELECT k, v FROM t WHERE k=10]
query T retry
EXECUTE p
----
distribution: full
distribution: local
vectorized: true
·
• scan
Expand All @@ -374,7 +374,7 @@ USING constraints='[+region=test]', lease_preferences='[[+region=test,+dc=dc2]]'
query T retry
EXECUTE p
----
distribution: full
distribution: local
vectorized: true
·
• scan
Expand Down Expand Up @@ -406,7 +406,7 @@ ALTER INDEX t36642@secondary CONFIGURE ZONE USING constraints='[+region=test]',
query T retry
EXPLAIN SELECT * FROM t36642 WHERE k=10
----
distribution: full
distribution: local
vectorized: true
·
• scan
Expand All @@ -423,7 +423,7 @@ ALTER INDEX t36642@secondary CONFIGURE ZONE USING constraints='[+region=test]',
query T retry
EXPLAIN SELECT * FROM t36642 WHERE k=10
----
distribution: full
distribution: local
vectorized: true
·
• scan
Expand Down Expand Up @@ -481,7 +481,7 @@ CONFIGURE ZONE USING constraints='[+region=test]', lease_preferences='[[+dc=dc1]
query T retry
EXPLAIN SELECT * FROM t36644 WHERE k=10
----
distribution: full
distribution: local
vectorized: true
·
• scan
Expand All @@ -499,7 +499,7 @@ CONFIGURE ZONE USING constraints='[+region=test]', lease_preferences='[[+dc=dc1]
query T retry
EXPLAIN SELECT * FROM t36644 WHERE k=10
----
distribution: full
distribution: local
vectorized: true
·
• scan
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/clisqlshell/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func Example_misc_table() {
// sql --format=table -e explain select s, 'foo' from t.t
// info
// --------------------------
// distribution: full
// distribution: local
// vectorized: true
//
// • render
Expand Down
34 changes: 19 additions & 15 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1646,13 +1646,14 @@ func (dsp *DistSQLPlanner) addSorters(
// needed to perform the physical planning of aggregators once the specs have
// been created.
type aggregatorPlanningInfo struct {
aggregations []execinfrapb.AggregatorSpec_Aggregation
argumentsColumnTypes [][]*types.T
isScalar bool
groupCols []int
groupColOrdering colinfo.ColumnOrdering
inputMergeOrdering execinfrapb.Ordering
reqOrdering ReqOrdering
aggregations []execinfrapb.AggregatorSpec_Aggregation
argumentsColumnTypes [][]*types.T
isScalar bool
groupCols []int
groupColOrdering colinfo.ColumnOrdering
inputMergeOrdering execinfrapb.Ordering
reqOrdering ReqOrdering
allowPartialDistribution bool
}

// addAggregators adds aggregators corresponding to a groupNode and updates the plan to
Expand Down Expand Up @@ -2175,7 +2176,7 @@ func (dsp *DistSQLPlanner) planAggregators(

// We have multiple streams, so we definitely have a processor planned
// on a remote node.
stageID := p.NewStage(true /* containsRemoteProcessor */)
stageID := p.NewStage(true /* containsRemoteProcessor */, info.allowPartialDistribution)

// We have one final stage processor for each result router. This is a
// somewhat arbitrary decision; we could have a different number of nodes
Expand Down Expand Up @@ -2845,7 +2846,7 @@ func (dsp *DistSQLPlanner) planJoiners(
p := planCtx.NewPhysicalPlan()
physicalplan.MergePlans(
&p.PhysicalPlan, &info.leftPlan.PhysicalPlan, &info.rightPlan.PhysicalPlan,
info.leftPlanDistribution, info.rightPlanDistribution,
info.leftPlanDistribution, info.rightPlanDistribution, info.allowPartialDistribution,
)
leftRouters := info.leftPlan.ResultRouters
rightRouters := info.rightPlan.ResultRouters
Expand Down Expand Up @@ -3005,7 +3006,7 @@ func (dsp *DistSQLPlanner) createPhysPlanForPlanNode(

case *valuesNode:
if mustWrapValuesNode(planCtx, n.specifiedInQuery) {
plan, err = dsp.wrapPlan(planCtx, n)
plan, err = dsp.wrapPlan(planCtx, n, false /* allowPartialDistribution */)
} else {
colTypes := getTypesFromResultColumns(n.columns)
var spec *execinfrapb.ValuesCoreSpec
Expand All @@ -3027,7 +3028,7 @@ func (dsp *DistSQLPlanner) createPhysPlanForPlanNode(

case *createStatsNode:
if n.runAsJob {
plan, err = dsp.wrapPlan(planCtx, n)
plan, err = dsp.wrapPlan(planCtx, n, false /* allowPartialDistribution */)
} else {
// Create a job record but don't actually start the job.
var record *jobs.Record
Expand All @@ -3041,7 +3042,7 @@ func (dsp *DistSQLPlanner) createPhysPlanForPlanNode(

default:
// Can't handle a node? We wrap it and continue on our way.
plan, err = dsp.wrapPlan(planCtx, n)
plan, err = dsp.wrapPlan(planCtx, n, false /* allowPartialDistribution */)
}

if err != nil {
Expand Down Expand Up @@ -3086,7 +3087,9 @@ func (dsp *DistSQLPlanner) createPhysPlanForPlanNode(
// will create a planNodeToRowSource wrapper for the sub-tree that's not
// plannable by DistSQL. If that sub-tree has DistSQL-plannable sources, they
// will be planned by DistSQL and connected to the wrapper.
func (dsp *DistSQLPlanner) wrapPlan(planCtx *PlanningCtx, n planNode) (*PhysicalPlan, error) {
func (dsp *DistSQLPlanner) wrapPlan(
planCtx *PlanningCtx, n planNode, allowPartialDistribution bool,
) (*PhysicalPlan, error) {
useFastPath := planCtx.planDepth == 1 && planCtx.stmtType == tree.RowsAffected

// First, we search the planNode tree we're trying to wrap for the first
Expand Down Expand Up @@ -3177,7 +3180,7 @@ func (dsp *DistSQLPlanner) wrapPlan(planCtx *PlanningCtx, n planNode) (*Physical
Type: execinfrapb.OutputRouterSpec_PASS_THROUGH,
}},
// This stage consists of a single processor planned on the gateway.
StageID: p.NewStage(false /* containsRemoteProcessor */),
StageID: p.NewStage(false /* containsRemoteProcessor */, allowPartialDistribution),
ResultTypes: wrapper.outputTypes,
},
}
Expand Down Expand Up @@ -3597,6 +3600,7 @@ func (dsp *DistSQLPlanner) createPlanForSetOp(
// the distribution of the whole plans.
leftPlan.GetLastStageDistribution(),
rightPlan.GetLastStageDistribution(),
false, /* allowPartialDistribution */
)

if n.unionType == tree.UnionOp {
Expand Down Expand Up @@ -3811,7 +3815,7 @@ func (dsp *DistSQLPlanner) createPlanForWindow(
}
// We have multiple streams, so we definitely have a processor planned
// on a remote node.
stageID := plan.NewStage(true /* containsRemoteProcessor */)
stageID := plan.NewStage(true /* containsRemoteProcessor */, false /* allowPartialDistribution */)

// We put a windower on each node and we connect it
// with all hash routers from the previous stage in
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/distsql_plan_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type joinPlanningInfo struct {
// then a hash join is planned.
leftMergeOrd, rightMergeOrd execinfrapb.Ordering
leftPlanDistribution, rightPlanDistribution physicalplan.PlanDistribution
allowPartialDistribution bool
}

// makeCoreSpec creates a processor core for hash and merge joins based on the
Expand Down
39 changes: 20 additions & 19 deletions pkg/sql/distsql_spec_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (e *distSQLSpecExecFactory) ConstructValues(
specifiedInQuery: true,
}
planNodesToClose = []planNode{v}
physPlan, err = e.dsp.wrapPlan(planCtx, v)
physPlan, err = e.dsp.wrapPlan(planCtx, v, e.planningMode != distSQLLocalOnlyPlanning)
} else {
// We can create a spec for the values processor, so we don't create a
// valuesNode.
Expand Down Expand Up @@ -158,7 +158,7 @@ func (e *distSQLSpecExecFactory) ConstructScan(
return constructVirtualScan(
e, e.planner, table, index, params, reqOrdering,
func(d *delayedNode) (exec.Node, error) {
physPlan, err := e.dsp.wrapPlan(e.getPlanCtx(cannotDistribute), d)
physPlan, err := e.dsp.wrapPlan(e.getPlanCtx(cannotDistribute), d, e.planningMode != distSQLLocalOnlyPlanning)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -830,7 +830,7 @@ func (e *distSQLSpecExecFactory) ConstructExplain(
}
}

physPlan, err := e.dsp.wrapPlan(e.getPlanCtx(cannotDistribute), explainNode)
physPlan, err := e.dsp.wrapPlan(e.getPlanCtx(cannotDistribute), explainNode, e.planningMode != distSQLLocalOnlyPlanning)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -971,7 +971,7 @@ func (e *distSQLSpecExecFactory) ConstructOpaque(metadata opt.OpaqueMetadata) (e
if err != nil {
return nil, err
}
physPlan, err := e.dsp.wrapPlan(e.getPlanCtx(cannotDistribute), plan)
physPlan, err := e.dsp.wrapPlan(e.getPlanCtx(cannotDistribute), plan, e.planningMode != distSQLLocalOnlyPlanning)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1090,21 +1090,22 @@ func (e *distSQLSpecExecFactory) constructHashOrMergeJoin(
leftEqColsRemapped := eqCols(leftEqCols, leftMap)
rightEqColsRemapped := eqCols(rightEqCols, rightMap)
info := joinPlanningInfo{
leftPlan: leftPhysPlan,
rightPlan: rightPhysPlan,
joinType: joinType,
joinResultTypes: getTypesFromResultColumns(resultColumns),
onExpr: onExpr,
post: post,
joinToStreamColMap: joinToStreamColMap,
leftEqCols: leftEqColsRemapped,
rightEqCols: rightEqColsRemapped,
leftEqColsAreKey: leftEqColsAreKey,
rightEqColsAreKey: rightEqColsAreKey,
leftMergeOrd: distsqlOrdering(mergeJoinOrdering, leftEqColsRemapped),
rightMergeOrd: distsqlOrdering(mergeJoinOrdering, rightEqColsRemapped),
leftPlanDistribution: leftPhysPlan.Distribution,
rightPlanDistribution: rightPhysPlan.Distribution,
leftPlan: leftPhysPlan,
rightPlan: rightPhysPlan,
joinType: joinType,
joinResultTypes: getTypesFromResultColumns(resultColumns),
onExpr: onExpr,
post: post,
joinToStreamColMap: joinToStreamColMap,
leftEqCols: leftEqColsRemapped,
rightEqCols: rightEqColsRemapped,
leftEqColsAreKey: leftEqColsAreKey,
rightEqColsAreKey: rightEqColsAreKey,
leftMergeOrd: distsqlOrdering(mergeJoinOrdering, leftEqColsRemapped),
rightMergeOrd: distsqlOrdering(mergeJoinOrdering, rightEqColsRemapped),
leftPlanDistribution: leftPhysPlan.Distribution,
rightPlanDistribution: rightPhysPlan.Distribution,
allowPartialDistribution: e.planningMode != distSQLLocalOnlyPlanning,
}
p := e.dsp.planJoiners(planCtx, &info, ReqOrdering(reqOrdering))
p.ResultColumns = resultColumns
Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/explain_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,13 @@ func (e *explainPlanNode) startExec(params runParams) error {
// Determine the "distribution" and "vectorized" values, which we will emit as
// special rows.

// Note that we delay adding the annotation about the distribution until
// after the plan is finalized (when the physical plan is successfully
// created).
distribution := getPlanDistribution(
params.ctx, params.p, params.extendedEvalCtx.ExecCfg.NodeID,
params.extendedEvalCtx.SessionData().DistSQLMode, plan.main,
)
ob.AddDistribution(distribution.String())

outerSubqueries := params.p.curPlan.subqueryPlans
distSQLPlanner := params.extendedEvalCtx.DistSQLPlanner
Expand All @@ -76,11 +78,13 @@ func (e *explainPlanNode) startExec(params runParams) error {
}
return err
}
ob.AddDistribution(distribution.String())
// For regular EXPLAIN, simply skip emitting the "vectorized" information.
} else {
// There might be an issue making the physical plan, but that should not
// cause an error or panic, so swallow the error. See #40677 for example.
distSQLPlanner.FinalizePlan(planCtx, physicalPlan)
ob.AddDistribution(physicalPlan.Distribution.String())
flows := physicalPlan.GenerateFlowSpecs()
flowCtx := newFlowCtxForExplainPurposes(planCtx, params.p)

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/explain_vec.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ func (n *explainVecNode) startExec(params runParams) error {
params.ctx, params.p, params.extendedEvalCtx.ExecCfg.NodeID,
params.extendedEvalCtx.SessionData().DistSQLMode, n.plan.main,
)
willDistribute := distribution.WillDistribute()
outerSubqueries := params.p.curPlan.subqueryPlans
planCtx := newPlanningCtxForExplainPurposes(distSQLPlanner, params, n.plan.subqueryPlans, distribution)
defer func() {
Expand Down Expand Up @@ -76,6 +75,7 @@ func (n *explainVecNode) startExec(params runParams) error {
return errors.New("vectorize is set to 'off'")
}
verbose := n.options.Flags[tree.ExplainFlagVerbose]
willDistribute := physPlan.Distribution.WillDistribute()
n.run.lines, n.run.cleanup, err = colflow.ExplainVec(
params.ctx, flowCtx, flows, physPlan.LocalProcessors, nil, /* opChains */
distSQLPlanner.gatewayNodeID, verbose, willDistribute,
Expand Down
Loading

0 comments on commit 4cc510b

Please sign in to comment.