Skip to content

Commit

Permalink
Merge #137562
Browse files Browse the repository at this point in the history
137562: sql: adjust physical planning heuristics around small joins r=yuzefovich a=yuzefovich

This PR adjusts the physical planner heuristics so that we no longer force the plan distribution in `distsql=auto` mode when we have a hash or a merge join with at least one equality column (i.e. non-cross join) when we have "small" inputs (less than 1k rows combined). See each commit for details.

Epic: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Dec 18, 2024
2 parents 5169369 + 2385731 commit d8de5b8
Show file tree
Hide file tree
Showing 14 changed files with 296 additions and 80 deletions.
182 changes: 116 additions & 66 deletions pkg/sql/distsql_physical_planner.go

Large diffs are not rendered by default.

22 changes: 19 additions & 3 deletions pkg/sql/distsql_spec_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,11 +425,13 @@ func (e *distSQLSpecExecFactory) ConstructHashJoin(
leftEqCols, rightEqCols []exec.NodeColumnOrdinal,
leftEqColsAreKey, rightEqColsAreKey bool,
extraOnCond tree.TypedExpr,
estimatedLeftRowCount, estimatedRightRowCount uint64,
) (exec.Node, error) {
return e.constructHashOrMergeJoin(
joinType, left, right, extraOnCond, leftEqCols, rightEqCols,
leftEqColsAreKey, rightEqColsAreKey,
ReqOrdering{} /* mergeJoinOrdering */, exec.OutputOrdering{}, /* reqOrdering */
estimatedLeftRowCount, estimatedRightRowCount,
)
}

Expand All @@ -440,6 +442,7 @@ func (e *distSQLSpecExecFactory) ConstructMergeJoin(
leftOrdering, rightOrdering colinfo.ColumnOrdering,
reqOrdering exec.OutputOrdering,
leftEqColsAreKey, rightEqColsAreKey bool,
estimatedLeftRowCount, estimatedRightRowCount uint64,
) (exec.Node, error) {
leftEqCols, rightEqCols, mergeJoinOrdering, err := getEqualityIndicesAndMergeJoinOrdering(leftOrdering, rightOrdering)
if err != nil {
Expand All @@ -448,6 +451,7 @@ func (e *distSQLSpecExecFactory) ConstructMergeJoin(
return e.constructHashOrMergeJoin(
joinType, left, right, onCond, leftEqCols, rightEqCols,
leftEqColsAreKey, rightEqColsAreKey, mergeJoinOrdering, reqOrdering,
estimatedLeftRowCount, estimatedRightRowCount,
)
}

Expand Down Expand Up @@ -1238,6 +1242,7 @@ func (e *distSQLSpecExecFactory) constructHashOrMergeJoin(
leftEqColsAreKey, rightEqColsAreKey bool,
mergeJoinOrdering colinfo.ColumnOrdering,
reqOrdering exec.OutputOrdering,
estimatedLeftRowCount, estimatedRightRowCount uint64,
) (exec.Node, error) {
leftPhysPlan, leftPlan := getPhysPlan(left)
rightPhysPlan, rightPlan := getPhysPlan(right)
Expand All @@ -1251,9 +1256,20 @@ func (e *distSQLSpecExecFactory) constructHashOrMergeJoin(
rightPlanToStreamColMap: rightMap,
}
post, joinToStreamColMap := helper.joinOutColumns(joinType, resultColumns)
// We always try to distribute the join, but planJoiners() itself might
// decide not to.
planCtx := e.getPlanCtx(shouldDistribute)
rec := canDistribute
if len(leftEqCols) > 0 {
// We can partition both streams on the equality columns.
if estimatedLeftRowCount == 0 && estimatedRightRowCount == 0 {
// In the absence of stats for both inputs, fall back to
// distributing.
rec = shouldDistribute
} else if estimatedLeftRowCount+estimatedRightRowCount >= e.planner.SessionData().DistributeGroupByRowCountThreshold {
// If we have stats on at least one input, then distribute only if
// the join appears to be "large".
rec = shouldDistribute
}
}
planCtx := e.getPlanCtx(rec)
onExpr, err := helper.remapOnExpr(e.ctx, planCtx, onCond)
if err != nil {
return nil, err
Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1973,7 +1973,7 @@ func getPlanDistribution(
return physicalplan.LocalPlan, nil
}

rec, err := checkSupportForPlanNode(plan.planNode, distSQLVisitor, sd)
rec, err := checkSupportForPlanNode(ctx, plan.planNode, distSQLVisitor, sd)
if err != nil {
// Don't use distSQL for this request.
log.VEventf(ctx, 1, "query not supported for distSQL: %s", err)
Expand Down Expand Up @@ -3343,6 +3343,10 @@ func (m *sessionDataMutator) SetAlwaysDistributeFullScans(val bool) {
m.data.AlwaysDistributeFullScans = val
}

func (m *sessionDataMutator) SetDistributeJoinRowCountThreshold(val uint64) {
m.data.DistributeJoinRowCountThreshold = val
}

func (m *sessionDataMutator) SetDisableVecUnionEagerCancellation(val bool) {
m.data.DisableVecUnionEagerCancellation = val
}
Expand Down
22 changes: 17 additions & 5 deletions pkg/sql/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,28 @@ type joinNode struct {

// columns contains the metadata for the results of this node.
columns colinfo.ResultColumns

// estimatedLeftRowCount, when set, is the estimated number of rows that
// the left input will produce.
estimatedLeftRowCount uint64
// estimatedRightRowCount, when set, is the estimated number of rows that
// the right input will produce.
estimatedRightRowCount uint64
}

func (p *planner) makeJoinNode(
left planDataSource, right planDataSource, pred *joinPredicate,
left planDataSource,
right planDataSource,
pred *joinPredicate,
estimatedLeftRowCount, estimatedRightRowCount uint64,
) *joinNode {
n := &joinNode{
left: left,
right: right,
pred: pred,
columns: pred.cols,
left: left,
right: right,
pred: pred,
columns: pred.cols,
estimatedLeftRowCount: estimatedLeftRowCount,
estimatedRightRowCount: estimatedRightRowCount,
}
return n
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/logictest/testdata/logic_test/information_schema
Original file line number Diff line number Diff line change
Expand Up @@ -3939,6 +3939,7 @@ disable_plan_gists off
disable_vec_union_eager_cancellation off
disallow_full_table_scans off
distribute_group_by_row_count_threshold 1000
distribute_join_row_count_threshold 1000
distribute_scan_row_count_threshold 10000
distribute_sort_row_count_threshold 1000
distsql_plan_gateway_bias 2
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/pg_catalog
Original file line number Diff line number Diff line change
Expand Up @@ -2939,6 +2939,7 @@ disable_plan_gists off N
disable_vec_union_eager_cancellation off NULL NULL NULL string
disallow_full_table_scans off NULL NULL NULL string
distribute_group_by_row_count_threshold 1000 NULL NULL NULL string
distribute_join_row_count_threshold 1000 NULL NULL NULL string
distribute_scan_row_count_threshold 10000 NULL NULL NULL string
distribute_sort_row_count_threshold 1000 NULL NULL NULL string
distsql off NULL NULL NULL string
Expand Down Expand Up @@ -3141,6 +3142,7 @@ disable_plan_gists off N
disable_vec_union_eager_cancellation off NULL user NULL off off
disallow_full_table_scans off NULL user NULL off off
distribute_group_by_row_count_threshold 1000 NULL user NULL 1000 1000
distribute_join_row_count_threshold 1000 NULL user NULL 1000 1000
distribute_scan_row_count_threshold 10000 NULL user NULL 10000 10000
distribute_sort_row_count_threshold 1000 NULL user NULL 1000 1000
distsql off NULL user NULL off off
Expand Down Expand Up @@ -3339,6 +3341,7 @@ disable_plan_gists NULL NULL NULL
disable_vec_union_eager_cancellation NULL NULL NULL NULL NULL
disallow_full_table_scans NULL NULL NULL NULL NULL
distribute_group_by_row_count_threshold NULL NULL NULL NULL NULL
distribute_join_row_count_threshold NULL NULL NULL NULL NULL
distribute_scan_row_count_threshold NULL NULL NULL NULL NULL
distribute_sort_row_count_threshold NULL NULL NULL NULL NULL
distsql NULL NULL NULL NULL NULL
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/logictest/testdata/logic_test/show_source
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ disable_plan_gists off
disable_vec_union_eager_cancellation off
disallow_full_table_scans off
distribute_group_by_row_count_threshold 1000
distribute_join_row_count_threshold 1000
distribute_scan_row_count_threshold 10000
distribute_sort_row_count_threshold 1000
distsql off
Expand Down
24 changes: 24 additions & 0 deletions pkg/sql/opt/exec/execbuilder/relational.go
Original file line number Diff line number Diff line change
Expand Up @@ -1356,6 +1356,10 @@ func (b *Builder) buildHashJoin(join memo.RelExpr) (_ execPlan, outputCols colOr
// to apply join hints to semi or anti joins. Join hints are only
// possible on explicit joins using the JOIN keyword, and semi and anti
// joins are only created from implicit joins without the JOIN keyword.
//
// Note that we use the row count estimates even when stats are
// unavailable so that the input that is more likely to be smaller ended
// up on the right side.
leftRowCount := leftExpr.Relational().Statistics().RowCount
rightRowCount := rightExpr.Relational().Statistics().RowCount
if leftRowCount < rightRowCount {
Expand Down Expand Up @@ -1430,6 +1434,13 @@ func (b *Builder) buildHashJoin(join memo.RelExpr) (_ execPlan, outputCols colOr

leftEqColsAreKey := leftExpr.Relational().FuncDeps.ColsAreStrictKey(leftEq.ToSet())
rightEqColsAreKey := rightExpr.Relational().FuncDeps.ColsAreStrictKey(rightEq.ToSet())
var leftRowCount, rightRowCount uint64
if leftExpr.Relational().Statistics().Available {
leftRowCount = uint64(leftExpr.Relational().Statistics().RowCount)
}
if rightExpr.Relational().Statistics().Available {
rightRowCount = uint64(rightExpr.Relational().Statistics().RowCount)
}

b.recordJoinType(joinType)
if isCrossJoin {
Expand All @@ -1444,6 +1455,7 @@ func (b *Builder) buildHashJoin(join memo.RelExpr) (_ execPlan, outputCols colOr
leftEqOrdinals, rightEqOrdinals,
leftEqColsAreKey, rightEqColsAreKey,
onExpr,
leftRowCount, rightRowCount,
)
if err != nil {
return execPlan{}, colOrdMap{}, err
Expand All @@ -1470,6 +1482,10 @@ func (b *Builder) buildMergeJoin(
// We have a partial join, and we want to make sure that the relation
// with smaller cardinality is on the right side. Note that we assumed
// it during the costing.
//
// Note that we use the row count estimates even when stats are
// unavailable so that the input that is more likely to be smaller ended
// up on the right side.
// TODO(raduberinde): we might also need to look at memo.JoinFlags when
// choosing a side.
leftRowCount := leftExpr.Relational().Statistics().RowCount
Expand Down Expand Up @@ -1527,6 +1543,13 @@ func (b *Builder) buildMergeJoin(
}
leftEqColsAreKey := leftExpr.Relational().FuncDeps.ColsAreStrictKey(leftEq.ColSet())
rightEqColsAreKey := rightExpr.Relational().FuncDeps.ColsAreStrictKey(rightEq.ColSet())
var leftRowCount, rightRowCount uint64
if leftExpr.Relational().Statistics().Available {
leftRowCount = uint64(leftExpr.Relational().Statistics().RowCount)
}
if rightExpr.Relational().Statistics().Available {
rightRowCount = uint64(rightExpr.Relational().Statistics().RowCount)
}
b.recordJoinType(joinType)
b.recordJoinAlgorithm(exec.MergeJoin)
var ep execPlan
Expand All @@ -1536,6 +1559,7 @@ func (b *Builder) buildMergeJoin(
onExpr,
leftOrd, rightOrd, reqOrd,
leftEqColsAreKey, rightEqColsAreKey,
leftRowCount, rightRowCount,
)
if err != nil {
return execPlan{}, colOrdMap{}, err
Expand Down
49 changes: 46 additions & 3 deletions pkg/sql/opt/exec/execbuilder/testdata/distsql_auto_mode
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,24 @@ SELECT info FROM [EXPLAIN SELECT * FROM kv WHERE k>1] WHERE info LIKE 'distribut
----
distribution: local

# Sort (no stats) - distribute.
query T
SELECT info FROM [EXPLAIN SELECT * FROM kv WHERE k>1 ORDER BY v] WHERE info LIKE 'distribution%'
----
distribution: full

# Hash join (no stats) - distribute.
query T
SELECT info FROM [EXPLAIN SELECT * FROM kv AS t1, kv AS t2 WHERE t1.k = t2.k+1 AND t1.k>1 AND t2.k>1] WHERE info LIKE 'distribution%'
----
distribution: full

# Cross join (no stats) - don't distribute.
query T
SELECT info FROM [EXPLAIN SELECT * FROM kv AS t1, kv AS t2 WHERE t1.k>1 AND t2.k>1] WHERE info LIKE 'distribution%'
----
distribution: local

statement ok
ALTER TABLE kv INJECT STATISTICS '[
{
Expand Down Expand Up @@ -186,20 +204,45 @@ SELECT info FROM [EXPLAIN SELECT k, sum(v) FROM kv WHERE k>1 GROUP BY k LIMIT 1]
----
distribution: full

# Hash join over large inputs - distribute.
query T
SELECT info FROM [EXPLAIN SELECT * FROM kv AS t1, kv AS t2 WHERE t1.k = t2.k+1 AND t1.k>1 AND t2.k>1] WHERE info LIKE 'distribution%'
----
distribution: full

# Cross join over large inputs - don't distribute.
query T
SELECT info FROM [EXPLAIN SELECT * FROM kv AS t1, kv AS t2 WHERE t1.k>1 AND t2.k>1] WHERE info LIKE 'distribution%'
----
distribution: local

# Now consider the inputs small.
statement ok
SET distribute_join_row_count_threshold = 100000;

# Hash join over small inputs - don't distribute.
query T
SELECT info FROM [EXPLAIN SELECT * FROM kv AS t1, kv AS t2 WHERE t1.k = t2.k+1 AND t1.k>1 AND t2.k>1] WHERE info LIKE 'distribution%'
----
distribution: local

statement ok
RESET distribute_join_row_count_threshold;

statement ok
CREATE TABLE kw (k INT PRIMARY KEY, w INT);
ALTER TABLE kw SPLIT AT SELECT i FROM generate_series(1,5) AS g(i);
ALTER TABLE kw EXPERIMENTAL_RELOCATE SELECT ARRAY[i], i FROM generate_series(1, 5) as g(i)

# Join - distribute.
# Large hash join - distribute.
query T
SELECT info FROM [EXPLAIN SELECT * FROM kv NATURAL JOIN kw] WHERE info LIKE 'distribution%'
----
distribution: full

# Join with the data living on the remote node - distribute.
# Large hash join with the data living on the remote node - distribute.
query T
SELECT info FROM [EXPLAIN SELECT * FROM kv NATURAL JOIN kw WHERE k=2] WHERE info LIKE 'distribution%'
SELECT info FROM [EXPLAIN SELECT * FROM kv NATURAL JOIN kw WHERE k=2 OR k>5] WHERE info LIKE 'distribution%'
----
distribution: full

Expand Down
29 changes: 29 additions & 0 deletions pkg/sql/opt/exec/execbuilder/testdata/distsql_join
Original file line number Diff line number Diff line change
Expand Up @@ -282,3 +282,32 @@ EXPLAIN (VEC)
)
AND c_custkey = o_custkey;
RESET vectorize

statement ok
CREATE TABLE kv (k INT PRIMARY KEY, v INT);
ALTER TABLE kv SPLIT AT SELECT i FROM generate_series(1,5) AS g(i);
ALTER TABLE kv EXPERIMENTAL_RELOCATE SELECT ARRAY[i], i FROM generate_series(1, 5) as g(i);

statement ok
ALTER TABLE kv INJECT STATISTICS '[
{
"columns": ["k"],
"created_at": "2024-01-01 1:00:00.00000+00:00",
"row_count": 1000,
"distinct_count": 1000
}
]';

# Force the distribution of the next plan.
statement ok
SET distribute_scan_row_count_threshold = 1;

# Verify that when merging plans where each has a single result router (n3 and
# n4), we plan the joiner on the same node as the right input (n4).
query T
SELECT * FROM [EXPLAIN (DISTSQL) SELECT * FROM kv AS t1, kv AS t2 WHERE t1.k = 3 AND t2.k = 4] WHERE info LIKE 'Diagram%';
----
Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJycklFv2jAUhd_3K6wrTYXJjDjJk6VJoJKp2Sh0BGmTKlR55EKthDizHboK8d-nhLKSqKTd_BDl2sffPcf2DsyvFDhEwTi4nJMP5PNsek1ugx8342E4IZ1RGM2jb-MuqQuSLRlGxDJ6_HPJ96tgFpCOZR8T8ol4XTKcjEjHulXpdxdPApmtFBmHXwNyMZJircXm_QVQyFSME7FBA_wWGFDwgIIPCwq5Vks0RulyaVcJw_g3cIeCzPLCltMLCkulEfgOrLQpAoeJ6qm8X1JitEKmlWxPQRX2eZOxYo3A_ZMu4Qi4t6cnjVh7o7n4meIMRYy679TaQbIdJNu7PMFHoHCp0mKTGU4SSrZAIcpFWfW9vgPnjLGGMadmzH27MfYfxvwWY27DGDtr7NlPkSkdo8a4eSevS15IdyXM_RclM9R9tx5uWlhOBowOXDrw6MA_G8JrhHBrIV55XzM0ucoMvumBOY1OPVZGwniNhyMyqtBLvNFqWWkP5bQCVRMxGntY9Q9FmB2XjNUoNn8fxymJtZK8Gom1ktx_ILmnJNYkea0k53w6tzyxVaoe7mQMHJyn0XvhcxxQbhBrU15bdK8eKuz8MS8PfSVSgxSuRYIjtKg3MpPGyiVwqwvc79_9CQAA__-phZ3F

statement ok
RESET distribute_scan_row_count_threshold;
4 changes: 4 additions & 0 deletions pkg/sql/opt/exec/factory.opt
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ define HashJoin {
LeftEqColsAreKey bool
RightEqColsAreKey bool
ExtraOnCond tree.TypedExpr
EstimatedLeftRowCount uint64
EstimatedRightRowCount uint64
}

# MergeJoin runs a merge join.
Expand All @@ -118,6 +120,8 @@ define MergeJoin {
ReqOrdering exec.OutputOrdering
LeftEqColsAreKey bool
RightEqColsAreKey bool
EstimatedLeftRowCount uint64
EstimatedRightRowCount uint64
}

# GroupBy runs an aggregation. A set of aggregations is performed for each group
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/opt_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ func (ef *execFactory) ConstructHashJoin(
leftEqCols, rightEqCols []exec.NodeColumnOrdinal,
leftEqColsAreKey, rightEqColsAreKey bool,
extraOnCond tree.TypedExpr,
estimatedLeftRowCount, estimatedRightRowCount uint64,
) (exec.Node, error) {
p := ef.planner
leftSrc := asDataSource(left)
Expand All @@ -435,7 +436,7 @@ func (ef *execFactory) ConstructHashJoin(
pred.leftEqKey = leftEqColsAreKey
pred.rightEqKey = rightEqColsAreKey

return p.makeJoinNode(leftSrc, rightSrc, pred), nil
return p.makeJoinNode(leftSrc, rightSrc, pred, estimatedLeftRowCount, estimatedRightRowCount), nil
}

// ConstructApplyJoin is part of the exec.Factory interface.
Expand All @@ -459,13 +460,14 @@ func (ef *execFactory) ConstructMergeJoin(
leftOrdering, rightOrdering colinfo.ColumnOrdering,
reqOrdering exec.OutputOrdering,
leftEqColsAreKey, rightEqColsAreKey bool,
estimatedLeftRowCount, estimatedRightRowCount uint64,
) (exec.Node, error) {
var err error
p := ef.planner
leftSrc := asDataSource(left)
rightSrc := asDataSource(right)
pred := makePredicate(joinType, leftSrc.columns, rightSrc.columns, onCond)
node := p.makeJoinNode(leftSrc, rightSrc, pred)
node := p.makeJoinNode(leftSrc, rightSrc, pred, estimatedLeftRowCount, estimatedRightRowCount)
pred.leftEqKey = leftEqColsAreKey
pred.rightEqKey = rightEqColsAreKey

Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/sessiondatapb/local_only_session_data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,10 @@ message LocalOnlySessionData {
// AlwaysDistributeFullScans determines whether full table scans always force
// the plan to be distributed, regardless of the estimated row count.
bool always_distribute_full_scans = 148;
// DistributeJoinRowCountThreshold is the minimum number of rows estimated to
// be processed from both inputs by the hash or merge join so that we choose
// to distribute the plan because of this joiner stage of DistSQL processors.
uint64 distribute_join_row_count_threshold = 149;

///////////////////////////////////////////////////////////////////////////
// WARNING: consider whether a session parameter you're adding needs to //
Expand Down
Loading

0 comments on commit d8de5b8

Please sign in to comment.