Skip to content

Commit

Permalink
sql: change physical planning heuristics a bit to prefer local execution
Browse files Browse the repository at this point in the history
This commit changes two parts of the physical planner heuristics:
- we now say that the lookup join "can be distributed" rather than
  "should be distributed"
- for top K sort we also say that it "can be" rather than "should be"
  distributed.

I'm not certain whether both of these changes are always beneficial, but
here is some justification.

The change to the lookup join heuristic will make it so that the
distribution of the join reader stage is determined by other stages of
the physical plan in `distsql=auto` mode. Consider an example when the
input to the lookup join is the table reader that scans only a handful
of rows. Previously, because of the "should distribute" heuristic, such
a plan would be "distributed" meaning we would plan a single table
reader on the leaseholder for the relevant range (most likely a remote
node from the perspective of the gateway node for the query); this, in
turn, would force the planning of the join reader on the same node, and
all consequent stages - if any - too. Such a decision can create
a hotspot if that particular range is queried often (think append-only
access pattern where the latest data is accessed most frequently).

With this change in such a scenario we will get more even compute
utilization across the cluster because the flow will be fully planned on
the gateway (which assumed to be chosen randomly by a load balancer),
and the lookup join will be performed from the gateway (we'll still need
to perform a remote read from the leaseholder of that single range).

The change to the top K sort heuristic seems less controversial to me,
yet I don't have a good justification. My feeling is that usually the
value of K is small, so it's ok if we don't "force" ourselves to
distribute the sort operation if the physical plan otherwise isn't
calling for it.

Overall, the choice of making changes to these two heuristics isn't very
principled and is driven by a single query from one of our largest
customers which happened to hit the hot spot scenario as described
above. In their case, they have append-like workload that is constantly
updating a single range. Eventually that range is split automatically,
but both new ranges stay on the same node. The latest data is accessed
far more frequently than any other data in the table, yet according to
the KV heuristics the ranges aren't being reallocated because the scans
hitting the hot ranges aren't exceeding the threshold. What isn't
accounted for is the fact that other parts of the flow are far more
compute-intensive, so this change attempts to alleviate such a hot node
scenario.

Release note (sql change): Some queries with lookup joins and/or top
K sorts are now more likely to be executed in "local" manner with
`distsql=auto` session variable when newly introduced
`sql.distsql.prefer_local_execution.enabled` cluster setting is set to
`true` (`false` is the default).
  • Loading branch information
yuzefovich committed Aug 9, 2021
1 parent f7dbae2 commit 0dd036b
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 31 deletions.
3 changes: 2 additions & 1 deletion pkg/sql/apply_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,8 @@ func runPlanInsidePlan(
evalCtx := params.p.ExtendedEvalContextCopy()
plannerCopy := *params.p
distributePlan := getPlanDistribution(
params.ctx, &plannerCopy, plannerCopy.execCfg.NodeID, plannerCopy.SessionData().DistSQLMode, plan.main,
params.ctx, &plannerCopy, plannerCopy.execCfg.NodeID, plannerCopy.SessionData().DistSQLMode,
&evalCtx.Settings.SV, plan.main,
)
planCtx := params.p.extendedEvalCtx.ExecCfg.DistSQLPlanner.NewPlanningCtx(
params.ctx, evalCtx, &plannerCopy, params.p.txn, distributePlan.WillDistribute(),
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,8 @@ func (ex *connExecutor) dispatchToExecutionEngine(

ex.sessionTracing.TracePlanCheckStart(ctx)
distributePlan := getPlanDistribution(
ctx, planner, planner.execCfg.NodeID, ex.sessionData.DistSQLMode, planner.curPlan.main,
ctx, planner, planner.execCfg.NodeID, ex.sessionData.DistSQLMode,
&planner.extendedEvalCtx.EvalContext.Settings.SV, planner.curPlan.main,
)
ex.sessionTracing.TracePlanCheckEnd(ctx, nil, distributePlan.WillDistribute())

Expand Down
70 changes: 47 additions & 23 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
Expand Down Expand Up @@ -357,28 +358,43 @@ func mustWrapValuesNode(planCtx *PlanningCtx, specifiedInQuery bool) bool {
return false
}

var preferLocalExecution = settings.RegisterBoolSetting(
`sql.distsql.prefer_local_execution.enabled`,
"setting to true makes the DistSQL physical planner heuristics prefer "+
"the local execution slightly in some cases",
false,
)

// checkSupportForPlanNode returns a distRecommendation (as described above) or
// cannotDistribute and an error if the plan subtree is not distributable.
// The error doesn't indicate complete failure - it's instead the reason that
// this plan couldn't be distributed.
// TODO(radu): add tests for this.
func checkSupportForPlanNode(node planNode) (distRecommendation, error) {
//
// - preferLocalExecution indicates whether the heuristics should favor local
// execution in some cases (namely, lookup joins and top K sorts are preferred
// to be executed locally in distsql=auto mode). Note that if a stage of a
// physical plan is deemed as "should be distributed", preferLocalExecution is
// ignored.
func checkSupportForPlanNode(
node planNode, outputNodeHasLimit bool, preferLocalExecution bool,
) (distRecommendation, error) {
switch n := node.(type) {
// Keep these cases alphabetized, please!
case *distinctNode:
return checkSupportForPlanNode(n.plan)
return checkSupportForPlanNode(n.plan, false /* outputNodeHasLimit */, preferLocalExecution)

case *exportNode:
return checkSupportForPlanNode(n.source)
return checkSupportForPlanNode(n.source, false /* outputNodeHasLimit */, preferLocalExecution)

case *filterNode:
if err := checkExpr(n.filter); err != nil {
return cannotDistribute, err
}
return checkSupportForPlanNode(n.source.plan)
return checkSupportForPlanNode(n.source.plan, false /* outputNodeHasLimit */, preferLocalExecution)

case *groupNode:
rec, err := checkSupportForPlanNode(n.plan)
rec, err := checkSupportForPlanNode(n.plan, false /* outputNodeHasLimit */, preferLocalExecution)
if err != nil {
return cannotDistribute, err
}
Expand All @@ -388,19 +404,19 @@ func checkSupportForPlanNode(node planNode) (distRecommendation, error) {
case *indexJoinNode:
// n.table doesn't have meaningful spans, but we need to check support (e.g.
// for any filtering expression).
if _, err := checkSupportForPlanNode(n.table); err != nil {
if _, err := checkSupportForPlanNode(n.table, false /* outputNodeHasLimit */, preferLocalExecution); err != nil {
return cannotDistribute, err
}
return checkSupportForPlanNode(n.input)
return checkSupportForPlanNode(n.input, false /* outputNodeHasLimit */, preferLocalExecution)

case *invertedFilterNode:
return checkSupportForInvertedFilterNode(n)
return checkSupportForInvertedFilterNode(n, preferLocalExecution)

case *invertedJoinNode:
if err := checkExpr(n.onExpr); err != nil {
return cannotDistribute, err
}
rec, err := checkSupportForPlanNode(n.input)
rec, err := checkSupportForPlanNode(n.input, false /* outputNodeHasLimit */, preferLocalExecution)
if err != nil {
return cannotDistribute, err
}
Expand All @@ -410,11 +426,11 @@ func checkSupportForPlanNode(node planNode) (distRecommendation, error) {
if err := checkExpr(n.pred.onCond); err != nil {
return cannotDistribute, err
}
recLeft, err := checkSupportForPlanNode(n.left.plan)
recLeft, err := checkSupportForPlanNode(n.left.plan, false /* outputNodeHasLimit */, preferLocalExecution)
if err != nil {
return cannotDistribute, err
}
recRight, err := checkSupportForPlanNode(n.right.plan)
recRight, err := checkSupportForPlanNode(n.right.plan, false /* outputNodeHasLimit */, preferLocalExecution)
if err != nil {
return cannotDistribute, err
}
Expand All @@ -431,7 +447,7 @@ func checkSupportForPlanNode(node planNode) (distRecommendation, error) {
// Note that we don't need to check whether we support distribution of
// n.countExpr or n.offsetExpr because those expressions are evaluated
// locally, during the physical planning.
return checkSupportForPlanNode(n.plan)
return checkSupportForPlanNode(n.plan, true /* outputNodeHasLimit */, preferLocalExecution)

case *lookupJoinNode:
if n.table.lockingStrength != descpb.ScanLockingStrength_FOR_NONE {
Expand All @@ -448,10 +464,13 @@ func checkSupportForPlanNode(node planNode) (distRecommendation, error) {
if err := checkExpr(n.onCond); err != nil {
return cannotDistribute, err
}
rec, err := checkSupportForPlanNode(n.input)
rec, err := checkSupportForPlanNode(n.input, false /* outputNodeHasLimit */, preferLocalExecution)
if err != nil {
return cannotDistribute, err
}
if preferLocalExecution {
return rec.compose(canDistribute), nil
}
return rec.compose(shouldDistribute), nil

case *ordinalityNode:
Expand All @@ -460,15 +479,15 @@ func checkSupportForPlanNode(node planNode) (distRecommendation, error) {
return cannotDistribute, nil

case *projectSetNode:
return checkSupportForPlanNode(n.source)
return checkSupportForPlanNode(n.source, false /* outputNodeHasLimit */, preferLocalExecution)

case *renderNode:
for _, e := range n.render {
if err := checkExpr(e); err != nil {
return cannotDistribute, err
}
}
return checkSupportForPlanNode(n.source.plan)
return checkSupportForPlanNode(n.source.plan, outputNodeHasLimit, preferLocalExecution)

case *scanNode:
if n.lockingStrength != descpb.ScanLockingStrength_FOR_NONE {
Expand Down Expand Up @@ -497,23 +516,26 @@ func checkSupportForPlanNode(node planNode) (distRecommendation, error) {
}

case *sortNode:
rec, err := checkSupportForPlanNode(n.plan)
rec, err := checkSupportForPlanNode(n.plan, false /* outputNodeHasLimit */, preferLocalExecution)
if err != nil {
return cannotDistribute, err
}
if outputNodeHasLimit && preferLocalExecution {
// If we have a top K sort, we can distribute the query.
return rec.compose(canDistribute), nil
}
// If we have to sort, distribute the query.
rec = rec.compose(shouldDistribute)
return rec, nil
return rec.compose(shouldDistribute), nil

case *unaryNode:
return canDistribute, nil

case *unionNode:
recLeft, err := checkSupportForPlanNode(n.left)
recLeft, err := checkSupportForPlanNode(n.left, false /* outputNodeHasLimit */, preferLocalExecution)
if err != nil {
return cannotDistribute, err
}
recRight, err := checkSupportForPlanNode(n.right)
recRight, err := checkSupportForPlanNode(n.right, false /* outputNodeHasLimit */, preferLocalExecution)
if err != nil {
return cannotDistribute, err
}
Expand All @@ -537,7 +559,7 @@ func checkSupportForPlanNode(node planNode) (distRecommendation, error) {
return canDistribute, nil

case *windowNode:
return checkSupportForPlanNode(n.plan)
return checkSupportForPlanNode(n.plan, false /* outputNodeHasLimit */, preferLocalExecution)

case *zeroNode:
return canDistribute, nil
Expand All @@ -559,8 +581,10 @@ func checkSupportForPlanNode(node planNode) (distRecommendation, error) {
}
}

func checkSupportForInvertedFilterNode(n *invertedFilterNode) (distRecommendation, error) {
rec, err := checkSupportForPlanNode(n.input)
func checkSupportForInvertedFilterNode(
n *invertedFilterNode, preferLocalExecution bool,
) (distRecommendation, error) {
rec, err := checkSupportForPlanNode(n.input, false /* outputNodeHasLimit */, preferLocalExecution)
if err != nil {
return cannotDistribute, err
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -1039,7 +1039,8 @@ func (dsp *DistSQLPlanner) planAndRunSubquery(
defer subqueryMemAccount.Close(ctx)

distributeSubquery := getPlanDistribution(
ctx, planner, planner.execCfg.NodeID, planner.SessionData().DistSQLMode, subqueryPlan.plan,
ctx, planner, planner.execCfg.NodeID, planner.SessionData().DistSQLMode,
&evalCtx.Settings.SV, subqueryPlan.plan,
).WillDistribute()
subqueryPlanCtx := dsp.NewPlanningCtx(ctx, evalCtx, planner, planner.txn, distributeSubquery)
subqueryPlanCtx.stmtType = tree.Rows
Expand Down Expand Up @@ -1336,7 +1337,8 @@ func (dsp *DistSQLPlanner) planAndRunPostquery(
defer postqueryMemAccount.Close(ctx)

distributePostquery := getPlanDistribution(
ctx, planner, planner.execCfg.NodeID, planner.SessionData().DistSQLMode, postqueryPlan,
ctx, planner, planner.execCfg.NodeID, planner.SessionData().DistSQLMode,
&evalCtx.Settings.SV, postqueryPlan,
).WillDistribute()
postqueryPlanCtx := dsp.NewPlanningCtx(ctx, evalCtx, planner, planner.txn, distributePostquery)
postqueryPlanCtx.stmtType = tree.Rows
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1125,6 +1125,7 @@ func getPlanDistribution(
p *planner,
nodeID *base.SQLIDContainer,
distSQLMode sessiondata.DistSQLExecMode,
sv *settings.Values,
plan planMaybePhysical,
) physicalplan.PlanDistribution {
if plan.isPhysicalPlan() {
Expand All @@ -1150,7 +1151,7 @@ func getPlanDistribution(
return physicalplan.LocalPlan
}

rec, err := checkSupportForPlanNode(plan.planNode)
rec, err := checkSupportForPlanNode(plan.planNode, false /* outputNodeHasLimit */, preferLocalExecution.Get(sv))
if err != nil {
// Don't use distSQL for this request.
log.VEventf(ctx, 1, "query not supported for distSQL: %s", err)
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/explain_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ func (e *explainPlanNode) startExec(params runParams) error {

distribution := getPlanDistribution(
params.ctx, params.p, params.extendedEvalCtx.ExecCfg.NodeID,
params.extendedEvalCtx.SessionData.DistSQLMode, plan.main,
params.extendedEvalCtx.SessionData.DistSQLMode,
&params.extendedEvalCtx.Settings.SV, plan.main,
)
ob.AddDistribution(distribution.String())

Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/explain_vec.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ func (n *explainVecNode) startExec(params runParams) error {
distSQLPlanner := params.extendedEvalCtx.DistSQLPlanner
distribution := getPlanDistribution(
params.ctx, params.p, params.extendedEvalCtx.ExecCfg.NodeID,
params.extendedEvalCtx.SessionData.DistSQLMode, n.plan.main,
params.extendedEvalCtx.SessionData.DistSQLMode,
&params.extendedEvalCtx.Settings.SV, n.plan.main,
)
willDistribute := distribution.WillDistribute()
outerSubqueries := params.p.curPlan.subqueryPlans
Expand Down
55 changes: 55 additions & 0 deletions pkg/sql/opt/exec/execbuilder/testdata/distsql_auto_mode
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,58 @@ query T
SELECT info FROM [EXPLAIN SELECT * FROM abc WHERE b=1 AND a%2=0] WHERE info LIKE 'distribution%'
----
distribution: local

# Lookup join - distribute.
query T
SELECT info FROM [EXPLAIN SELECT a FROM abc INNER LOOKUP JOIN kv ON b = k WHERE k < 10] WHERE info LIKE 'distribution%'
----
distribution: full

# Lookup join on top of the full scan - distribute.
query T
SELECT info FROM [EXPLAIN SELECT a FROM abc INNER LOOKUP JOIN kv ON b = k] WHERE info LIKE 'distribution%'
----
distribution: full

# Limit after sort (i.e. top K sort) - distribute by default.
query T
SELECT info FROM [EXPLAIN SELECT * FROM kv WHERE k>1 ORDER BY v LIMIT 1] WHERE info LIKE 'distribution%'
----
distribution: full

# Lookup join - distribute by default.
query T
SELECT info FROM [EXPLAIN SELECT a FROM abc INNER LOOKUP JOIN kv ON b = k WHERE k < 10] WHERE info LIKE 'distribution%'
----
distribution: full

statement ok
SET CLUSTER SETTING sql.distsql.prefer_local_execution.enabled = true;

# Limit after sort (i.e. top K sort) - don't distribute when preferring local
# execution.
query T
SELECT info FROM [EXPLAIN SELECT * FROM kv WHERE k>1 ORDER BY v LIMIT 1] WHERE info LIKE 'distribution%'
----
distribution: local

# General sort - distribute.
query T
SELECT info FROM [EXPLAIN SELECT * FROM kv WHERE k>1 ORDER BY v] WHERE info LIKE 'distribution%'
----
distribution: full

# Lookup join - don't distribute when preferring local execution.
query T
SELECT info FROM [EXPLAIN SELECT a FROM abc INNER LOOKUP JOIN kv ON b = k WHERE k < 10] WHERE info LIKE 'distribution%'
----
distribution: local

# Lookup join on top of the full scan - distribute.
query T
SELECT info FROM [EXPLAIN SELECT a FROM abc INNER LOOKUP JOIN kv ON b = k] WHERE info LIKE 'distribution%'
----
distribution: full

statement ok
RESET CLUSTER SETTING sql.distsql.prefer_local_execution.enabled;
2 changes: 1 addition & 1 deletion pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func (sc *SchemaChanger) backfillQueryIntoTable(
isLocal := !getPlanDistribution(
ctx, localPlanner, localPlanner.execCfg.NodeID,
localPlanner.extendedEvalCtx.SessionData.DistSQLMode,
localPlanner.curPlan.main,
&localPlanner.EvalContext().Settings.SV, localPlanner.curPlan.main,
).WillDistribute()
out := execinfrapb.ProcessorCoreUnion{BulkRowWriter: &execinfrapb.BulkRowWriterSpec{
Table: *table,
Expand Down

0 comments on commit 0dd036b

Please sign in to comment.