From 8d5751de1a5a03df1638062540a97e1cc2ec47bf Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Thu, 5 Aug 2021 20:00:33 -0700 Subject: [PATCH] sql: change physical planning heuristics a bit to prefer local execution This commit changes two parts of the physical planner heuristics: - we now say that the lookup join "can be distributed" rather than "should be distributed" - for top K sort we also say that it "can be" rather than "should be" distributed. I'm not certain whether both of these changes are always beneficial, but here is some justification. The change to the lookup join heuristic will make it so that the distribution of the join reader stage is determined by other stages of the physical plan in `distsql=auto` mode. Consider an example when the input to the lookup join is the table reader that scans only a handful of rows. Previously, because of the "should distribute" heuristic, such a plan would be "distributed" meaning we would plan a single table reader on the leaseholder for the relevant range (most likely a remote node from the perspective of the gateway node for the query); this, in turn, would force the planning of the join reader on the same node, and all consequent stages - if any - too. Such a decision can create a hotspot if that particular range is queried often (think append-only access pattern where the latest data is accessed most frequently). With this change in such a scenario we will get more even compute utilization across the cluster because the flow will be fully planned on the gateway (which assumed to be chosen randomly by a load balancer), and the lookup join will be performed from the gateway (we'll still need to perform a remote read from the leaseholder of that single range). The change to the top K sort heuristic seems less controversial to me, yet I don't have a good justification. My feeling is that usually the value of K is small, so it's ok if we don't "force" ourselves to distribute the sort operation if the physical plan otherwise isn't calling for it. Overall, the choice of making changes to these two heuristics isn't very principled and is driven by a single query from one of our largest customers which happened to hit the hot spot scenario as described above. In their case, they have append-like workload that is constantly updating a single range. Eventually that range is split automatically, but both new ranges stay on the same node. The latest data is accessed far more frequently than any other data in the table, yet according to the KV heuristics the ranges aren't being reallocated because the scans hitting the hot ranges aren't exceeding the threshold. What isn't accounted for is the fact that other parts of the flow are far more compute-intensive, so this change attempts to alleviate such a hot node scenario. Release note (sql change): Some queries with lookup joins and/or top K sorts are now more likely to be executed in "local" manner with `distsql=auto` session variable. --- .../testdata/logic_test/regional_by_row | 8 +-- pkg/sql/distsql_physical_planner.go | 49 ++++++++++--------- pkg/sql/exec_util.go | 2 +- .../execbuilder/testdata/distsql_auto_mode | 20 +++++++- 4 files changed, 51 insertions(+), 28 deletions(-) diff --git a/pkg/ccl/logictestccl/testdata/logic_test/regional_by_row b/pkg/ccl/logictestccl/testdata/logic_test/regional_by_row index ea7cd3d16342..e7def48f5f74 100644 --- a/pkg/ccl/logictestccl/testdata/logic_test/regional_by_row +++ b/pkg/ccl/logictestccl/testdata/logic_test/regional_by_row @@ -926,7 +926,7 @@ SELECT message FROM [SHOW KV TRACE FOR SESSION] WITH ORDINALITY OR message LIKE 'Scan%' ORDER BY ordinality ASC ---- -Scan /Table/75/1/"@"/10/0{-/NULL}, /Table/75/1/"\x80"/10/0{-/NULL}, /Table/75/1/"\xc0"/10/0{-/NULL} +Scan /Table/75/1/"@"/10/0, /Table/75/1/"\x80"/10/0, /Table/75/1/"\xc0"/10/0 fetched: /child/primary/'ap-southeast-2'/10/c_p_id -> /10 Scan /Table/74/1/"@"/10/0, /Table/74/1/"\x80"/10/0, /Table/74/1/"\xc0"/10/0 fetched: /parent/primary/'ap-southeast-2'/10 -> NULL @@ -956,7 +956,7 @@ SELECT message FROM [SHOW KV TRACE FOR SESSION] WITH ORDINALITY OR message LIKE 'Scan%' ORDER BY ordinality ASC ---- -Scan /Table/75/1/"@"/10/0{-/NULL}, /Table/75/1/"\x80"/10/0{-/NULL}, /Table/75/1/"\xc0"/10/0{-/NULL} +Scan /Table/75/1/"@"/10/0, /Table/75/1/"\x80"/10/0, /Table/75/1/"\xc0"/10/0 fetched: /child/primary/'ap-southeast-2'/10/c_p_id -> /10 Scan /Table/74/1/"@"/10/0, /Table/74/1/"\x80"/10/0, /Table/74/1/"\xc0"/10/0 fetched: /parent/primary/'ap-southeast-2'/10 -> NULL @@ -987,7 +987,7 @@ SELECT message FROM [SHOW KV TRACE FOR SESSION] WITH ORDINALITY OR message LIKE 'Scan%' ORDER BY ordinality ASC ---- -Scan /Table/75/1/"@"/10/0{-/NULL}, /Table/75/1/"\x80"/10/0{-/NULL}, /Table/75/1/"\xc0"/10/0{-/NULL} +Scan /Table/75/1/"@"/10/0, /Table/75/1/"\x80"/10/0, /Table/75/1/"\xc0"/10/0 fetched: /child/primary/'ap-southeast-2'/10/c_p_id -> /10 Scan /Table/74/1/"@"/10/0, /Table/74/1/"\x80"/10/0, /Table/74/1/"\xc0"/10/0 fetched: /parent/primary/'ap-southeast-2'/10 -> NULL @@ -1018,7 +1018,7 @@ SELECT message FROM [SHOW KV TRACE FOR SESSION] WITH ORDINALITY OR message LIKE 'Scan%' ORDER BY ordinality ASC ---- -Scan /Table/75/1/"@"/10/0{-/NULL}, /Table/75/1/"\x80"/10/0{-/NULL}, /Table/75/1/"\xc0"/10/0{-/NULL} +Scan /Table/75/1/"@"/10/0, /Table/75/1/"\x80"/10/0, /Table/75/1/"\xc0"/10/0 fetched: /child/primary/'ap-southeast-2'/10/c_p_id -> /10 Scan /Table/74/1/"@"/10/0, /Table/74/1/"\x80"/10/0, /Table/74/1/"\xc0"/10/0 fetched: /parent/primary/'ap-southeast-2'/10 -> NULL diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index f08895203817..d4492cddded7 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -364,23 +364,23 @@ func mustWrapValuesNode(planCtx *PlanningCtx, specifiedInQuery bool) bool { // The error doesn't indicate complete failure - it's instead the reason that // this plan couldn't be distributed. // TODO(radu): add tests for this. -func checkSupportForPlanNode(node planNode) (distRecommendation, error) { +func checkSupportForPlanNode(node planNode, outputNodeHasLimit bool) (distRecommendation, error) { switch n := node.(type) { // Keep these cases alphabetized, please! case *distinctNode: - return checkSupportForPlanNode(n.plan) + return checkSupportForPlanNode(n.plan, false /* outputNodeHasLimit */) case *exportNode: - return checkSupportForPlanNode(n.source) + return checkSupportForPlanNode(n.source, false /* outputNodeHasLimit */) case *filterNode: if err := checkExpr(n.filter); err != nil { return cannotDistribute, err } - return checkSupportForPlanNode(n.source.plan) + return checkSupportForPlanNode(n.source.plan, false /* outputNodeHasLimit */) case *groupNode: - rec, err := checkSupportForPlanNode(n.plan) + rec, err := checkSupportForPlanNode(n.plan, false /* outputNodeHasLimit */) if err != nil { return cannotDistribute, err } @@ -390,10 +390,10 @@ func checkSupportForPlanNode(node planNode) (distRecommendation, error) { case *indexJoinNode: // n.table doesn't have meaningful spans, but we need to check support (e.g. // for any filtering expression). - if _, err := checkSupportForPlanNode(n.table); err != nil { + if _, err := checkSupportForPlanNode(n.table, false /* outputNodeHasLimit */); err != nil { return cannotDistribute, err } - return checkSupportForPlanNode(n.input) + return checkSupportForPlanNode(n.input, false /* outputNodeHasLimit */) case *invertedFilterNode: return checkSupportForInvertedFilterNode(n) @@ -402,7 +402,7 @@ func checkSupportForPlanNode(node planNode) (distRecommendation, error) { if err := checkExpr(n.onExpr); err != nil { return cannotDistribute, err } - rec, err := checkSupportForPlanNode(n.input) + rec, err := checkSupportForPlanNode(n.input, false /* outputNodeHasLimit */) if err != nil { return cannotDistribute, err } @@ -412,11 +412,11 @@ func checkSupportForPlanNode(node planNode) (distRecommendation, error) { if err := checkExpr(n.pred.onCond); err != nil { return cannotDistribute, err } - recLeft, err := checkSupportForPlanNode(n.left.plan) + recLeft, err := checkSupportForPlanNode(n.left.plan, false /* outputNodeHasLimit */) if err != nil { return cannotDistribute, err } - recRight, err := checkSupportForPlanNode(n.right.plan) + recRight, err := checkSupportForPlanNode(n.right.plan, false /* outputNodeHasLimit */) if err != nil { return cannotDistribute, err } @@ -433,7 +433,7 @@ func checkSupportForPlanNode(node planNode) (distRecommendation, error) { // Note that we don't need to check whether we support distribution of // n.countExpr or n.offsetExpr because those expressions are evaluated // locally, during the physical planning. - return checkSupportForPlanNode(n.plan) + return checkSupportForPlanNode(n.plan, true /* outputNodeHasLimit */) case *lookupJoinNode: if n.table.lockingStrength != descpb.ScanLockingStrength_FOR_NONE { @@ -453,11 +453,11 @@ func checkSupportForPlanNode(node planNode) (distRecommendation, error) { if err := checkExpr(n.onCond); err != nil { return cannotDistribute, err } - rec, err := checkSupportForPlanNode(n.input) + rec, err := checkSupportForPlanNode(n.input, false /* outputNodeHasLimit */) if err != nil { return cannotDistribute, err } - return rec.compose(shouldDistribute), nil + return rec.compose(canDistribute), nil case *ordinalityNode: // WITH ORDINALITY never gets distributed so that the gateway node can @@ -465,7 +465,7 @@ func checkSupportForPlanNode(node planNode) (distRecommendation, error) { return cannotDistribute, nil case *projectSetNode: - return checkSupportForPlanNode(n.source) + return checkSupportForPlanNode(n.source, false /* outputNodeHasLimit */) case *renderNode: for _, e := range n.render { @@ -473,7 +473,7 @@ func checkSupportForPlanNode(node planNode) (distRecommendation, error) { return cannotDistribute, err } } - return checkSupportForPlanNode(n.source.plan) + return checkSupportForPlanNode(n.source.plan, outputNodeHasLimit) case *scanNode: if n.lockingStrength != descpb.ScanLockingStrength_FOR_NONE { @@ -502,23 +502,28 @@ func checkSupportForPlanNode(node planNode) (distRecommendation, error) { } case *sortNode: - rec, err := checkSupportForPlanNode(n.plan) + rec, err := checkSupportForPlanNode(n.plan, false /* outputNodeHasLimit */) if err != nil { return cannotDistribute, err } - // If we have to sort, distribute the query. - rec = rec.compose(shouldDistribute) + if outputNodeHasLimit { + // If we have a top K sort, we can distribute the query. + rec = rec.compose(canDistribute) + } else { + // If we have to sort, distribute the query. + rec = rec.compose(shouldDistribute) + } return rec, nil case *unaryNode: return canDistribute, nil case *unionNode: - recLeft, err := checkSupportForPlanNode(n.left) + recLeft, err := checkSupportForPlanNode(n.left, false /* outputNodeHasLimit */) if err != nil { return cannotDistribute, err } - recRight, err := checkSupportForPlanNode(n.right) + recRight, err := checkSupportForPlanNode(n.right, false /* outputNodeHasLimit */) if err != nil { return cannotDistribute, err } @@ -542,7 +547,7 @@ func checkSupportForPlanNode(node planNode) (distRecommendation, error) { return canDistribute, nil case *windowNode: - return checkSupportForPlanNode(n.plan) + return checkSupportForPlanNode(n.plan, false /* outputNodeHasLimit */) case *zeroNode: return canDistribute, nil @@ -565,7 +570,7 @@ func checkSupportForPlanNode(node planNode) (distRecommendation, error) { } func checkSupportForInvertedFilterNode(n *invertedFilterNode) (distRecommendation, error) { - rec, err := checkSupportForPlanNode(n.input) + rec, err := checkSupportForPlanNode(n.input, false /* outputNodeHasLimit */) if err != nil { return cannotDistribute, err } diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index ba3c2383a5e4..d1d716d6e6cd 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1269,7 +1269,7 @@ func getPlanDistribution( return physicalplan.LocalPlan } - rec, err := checkSupportForPlanNode(plan.planNode) + rec, err := checkSupportForPlanNode(plan.planNode, false /* outputNodeHasLimit */) if err != nil { // Don't use distSQL for this request. log.VEventf(ctx, 1, "query not supported for distSQL: %s", err) diff --git a/pkg/sql/opt/exec/execbuilder/testdata/distsql_auto_mode b/pkg/sql/opt/exec/execbuilder/testdata/distsql_auto_mode index b77f3cc1fbb7..9023375292d3 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/distsql_auto_mode +++ b/pkg/sql/opt/exec/execbuilder/testdata/distsql_auto_mode @@ -68,10 +68,16 @@ SELECT info FROM [EXPLAIN SELECT * FROM kv UNION SELECT * FROM kv LIMIT 1] WHERE ---- distribution: full -# Limit after sort - distribute. +# Limit after sort (i.e. top K sort) - don't distribute. query T SELECT info FROM [EXPLAIN SELECT * FROM kv WHERE k>1 ORDER BY v LIMIT 1] WHERE info LIKE 'distribution%' ---- +distribution: local + +# General sort - distribute. +query T +SELECT info FROM [EXPLAIN SELECT * FROM kv WHERE k>1 ORDER BY v] WHERE info LIKE 'distribution%' +---- distribution: full # Limit after aggregation - distribute. @@ -115,3 +121,15 @@ query T SELECT info FROM [EXPLAIN SELECT * FROM abc WHERE b=1 AND a%2=0] WHERE info LIKE 'distribution%' ---- distribution: local + +# Lookup join - don't distribute. +query T +SELECT info FROM [EXPLAIN SELECT a FROM abc INNER LOOKUP JOIN kv ON b = k WHERE k < 10] WHERE info LIKE 'distribution%' +---- +distribution: local + +# Lookup join on top of the full scan - distribute. +query T +SELECT info FROM [EXPLAIN SELECT a FROM abc INNER LOOKUP JOIN kv ON b = k] WHERE info LIKE 'distribution%' +---- +distribution: full