Skip to content

Commit

Permalink
sql: change physical planning heuristics a bit to prefer local execution
Browse files Browse the repository at this point in the history
This commit changes two parts of the physical planner heuristics:
- we now say that the lookup join "can be distributed" rather than
  "should be distributed"
- for top K sort we also say that it "can be" rather than "should be"
  distributed.

I'm not certain whether both of these changes are always beneficial, but
here is some justification.

The change to the lookup join heuristic will make it so that the
distribution of the join reader stage is determined by other stages of
the physical plan in `distsql=auto` mode. Consider an example when the
input to the lookup join is the table reader that scans only a handful
of rows. Previously, because of the "should distribute" heuristic, such
a plan would be "distributed" meaning we would plan a single table
reader on the leaseholder for the relevant range (most likely a remote
node from the perspective of the gateway node for the query); this, in
turn, would force the planning of the join reader on the same node, and
all consequent stages - if any - too. Such a decision can create
a hotspot if that particular range is queried often (think append-only
access pattern where the latest data is accessed most frequently).

With this change in such a scenario we will get more even compute
utilization across the cluster because the flow will be fully planned on
the gateway (which assumed to be chosen randomly by a load balancer),
and the lookup join will be performed from the gateway (we'll still need
to perform a remote read from the leaseholder of that single range).

The change to the top K sort heuristic seems less controversial to me,
yet I don't have a good justification. My feeling is that usually the
value of K is small, so it's ok if we don't "force" ourselves to
distribute the sort operation if the physical plan otherwise isn't
calling for it.

Overall, the choice of making changes to these two heuristics isn't very
principled and is driven by a single query from one of our largest
customers which happened to hit the hot spot scenario as described
above. In their case, they have append-like workload that is constantly
updating a single range. Eventually that range is split automatically,
but both new ranges stay on the same node. The latest data is accessed
far more frequently than any other data in the table, yet according to
the KV heuristics the ranges aren't being reallocated because the scans
hitting the hot ranges aren't exceeding the threshold. What isn't
accounted for is the fact that other parts of the flow are far more
compute-intensive, so this change attempts to alleviate such a hot node
scenario.

Release note (sql change): Some queries with lookup joins and/or top
K sorts are now more likely to be executed in "local" manner with
`distsql=auto` session variable.
  • Loading branch information
yuzefovich committed Aug 6, 2021
1 parent ca77dc5 commit 41b1d5f
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 24 deletions.
49 changes: 27 additions & 22 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,23 +364,23 @@ func mustWrapValuesNode(planCtx *PlanningCtx, specifiedInQuery bool) bool {
// The error doesn't indicate complete failure - it's instead the reason that
// this plan couldn't be distributed.
// TODO(radu): add tests for this.
func checkSupportForPlanNode(node planNode) (distRecommendation, error) {
func checkSupportForPlanNode(node planNode, outputNodeHasLimit bool) (distRecommendation, error) {
switch n := node.(type) {
// Keep these cases alphabetized, please!
case *distinctNode:
return checkSupportForPlanNode(n.plan)
return checkSupportForPlanNode(n.plan, false /* outputNodeHasLimit */)

case *exportNode:
return checkSupportForPlanNode(n.source)
return checkSupportForPlanNode(n.source, false /* outputNodeHasLimit */)

case *filterNode:
if err := checkExpr(n.filter); err != nil {
return cannotDistribute, err
}
return checkSupportForPlanNode(n.source.plan)
return checkSupportForPlanNode(n.source.plan, false /* outputNodeHasLimit */)

case *groupNode:
rec, err := checkSupportForPlanNode(n.plan)
rec, err := checkSupportForPlanNode(n.plan, false /* outputNodeHasLimit */)
if err != nil {
return cannotDistribute, err
}
Expand All @@ -390,10 +390,10 @@ func checkSupportForPlanNode(node planNode) (distRecommendation, error) {
case *indexJoinNode:
// n.table doesn't have meaningful spans, but we need to check support (e.g.
// for any filtering expression).
if _, err := checkSupportForPlanNode(n.table); err != nil {
if _, err := checkSupportForPlanNode(n.table, false /* outputNodeHasLimit */); err != nil {
return cannotDistribute, err
}
return checkSupportForPlanNode(n.input)
return checkSupportForPlanNode(n.input, false /* outputNodeHasLimit */)

case *invertedFilterNode:
return checkSupportForInvertedFilterNode(n)
Expand All @@ -402,7 +402,7 @@ func checkSupportForPlanNode(node planNode) (distRecommendation, error) {
if err := checkExpr(n.onExpr); err != nil {
return cannotDistribute, err
}
rec, err := checkSupportForPlanNode(n.input)
rec, err := checkSupportForPlanNode(n.input, false /* outputNodeHasLimit */)
if err != nil {
return cannotDistribute, err
}
Expand All @@ -412,11 +412,11 @@ func checkSupportForPlanNode(node planNode) (distRecommendation, error) {
if err := checkExpr(n.pred.onCond); err != nil {
return cannotDistribute, err
}
recLeft, err := checkSupportForPlanNode(n.left.plan)
recLeft, err := checkSupportForPlanNode(n.left.plan, false /* outputNodeHasLimit */)
if err != nil {
return cannotDistribute, err
}
recRight, err := checkSupportForPlanNode(n.right.plan)
recRight, err := checkSupportForPlanNode(n.right.plan, false /* outputNodeHasLimit */)
if err != nil {
return cannotDistribute, err
}
Expand All @@ -433,7 +433,7 @@ func checkSupportForPlanNode(node planNode) (distRecommendation, error) {
// Note that we don't need to check whether we support distribution of
// n.countExpr or n.offsetExpr because those expressions are evaluated
// locally, during the physical planning.
return checkSupportForPlanNode(n.plan)
return checkSupportForPlanNode(n.plan, true /* outputNodeHasLimit */)

case *lookupJoinNode:
if n.table.lockingStrength != descpb.ScanLockingStrength_FOR_NONE {
Expand All @@ -453,27 +453,27 @@ func checkSupportForPlanNode(node planNode) (distRecommendation, error) {
if err := checkExpr(n.onCond); err != nil {
return cannotDistribute, err
}
rec, err := checkSupportForPlanNode(n.input)
rec, err := checkSupportForPlanNode(n.input, false /* outputNodeHasLimit */)
if err != nil {
return cannotDistribute, err
}
return rec.compose(shouldDistribute), nil
return rec.compose(canDistribute), nil

case *ordinalityNode:
// WITH ORDINALITY never gets distributed so that the gateway node can
// always number each row in order.
return cannotDistribute, nil

case *projectSetNode:
return checkSupportForPlanNode(n.source)
return checkSupportForPlanNode(n.source, false /* outputNodeHasLimit */)

case *renderNode:
for _, e := range n.render {
if err := checkExpr(e); err != nil {
return cannotDistribute, err
}
}
return checkSupportForPlanNode(n.source.plan)
return checkSupportForPlanNode(n.source.plan, outputNodeHasLimit)

case *scanNode:
if n.lockingStrength != descpb.ScanLockingStrength_FOR_NONE {
Expand Down Expand Up @@ -502,23 +502,28 @@ func checkSupportForPlanNode(node planNode) (distRecommendation, error) {
}

case *sortNode:
rec, err := checkSupportForPlanNode(n.plan)
rec, err := checkSupportForPlanNode(n.plan, false /* outputNodeHasLimit */)
if err != nil {
return cannotDistribute, err
}
// If we have to sort, distribute the query.
rec = rec.compose(shouldDistribute)
if outputNodeHasLimit {
// If we have a top K sort, we can distribute the query.
rec = rec.compose(canDistribute)
} else {
// If we have to sort, distribute the query.
rec = rec.compose(shouldDistribute)
}
return rec, nil

case *unaryNode:
return canDistribute, nil

case *unionNode:
recLeft, err := checkSupportForPlanNode(n.left)
recLeft, err := checkSupportForPlanNode(n.left, false /* outputNodeHasLimit */)
if err != nil {
return cannotDistribute, err
}
recRight, err := checkSupportForPlanNode(n.right)
recRight, err := checkSupportForPlanNode(n.right, false /* outputNodeHasLimit */)
if err != nil {
return cannotDistribute, err
}
Expand All @@ -542,7 +547,7 @@ func checkSupportForPlanNode(node planNode) (distRecommendation, error) {
return canDistribute, nil

case *windowNode:
return checkSupportForPlanNode(n.plan)
return checkSupportForPlanNode(n.plan, false /* outputNodeHasLimit */)

case *zeroNode:
return canDistribute, nil
Expand All @@ -565,7 +570,7 @@ func checkSupportForPlanNode(node planNode) (distRecommendation, error) {
}

func checkSupportForInvertedFilterNode(n *invertedFilterNode) (distRecommendation, error) {
rec, err := checkSupportForPlanNode(n.input)
rec, err := checkSupportForPlanNode(n.input, false /* outputNodeHasLimit */)
if err != nil {
return cannotDistribute, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1258,7 +1258,7 @@ func getPlanDistribution(
return physicalplan.LocalPlan
}

rec, err := checkSupportForPlanNode(plan.planNode)
rec, err := checkSupportForPlanNode(plan.planNode, false /* outputNodeHasLimit */)
if err != nil {
// Don't use distSQL for this request.
log.VEventf(ctx, 1, "query not supported for distSQL: %s", err)
Expand Down
8 changes: 7 additions & 1 deletion pkg/sql/opt/exec/execbuilder/testdata/distsql_auto_mode
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,16 @@ SELECT info FROM [EXPLAIN SELECT * FROM kv UNION SELECT * FROM kv LIMIT 1] WHERE
----
distribution: full

# Limit after sort - distribute.
# Limit after sort (i.e. top K sort) - don't distribute.
query T
SELECT info FROM [EXPLAIN SELECT * FROM kv WHERE k>1 ORDER BY v LIMIT 1] WHERE info LIKE 'distribution%'
----
distribution: local

# General sort - distribute.
query T
SELECT info FROM [EXPLAIN SELECT * FROM kv WHERE k>1 ORDER BY v] WHERE info LIKE 'distribution%'
----
distribution: full

# Limit after aggregation - distribute.
Expand Down

0 comments on commit 41b1d5f

Please sign in to comment.