Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-30598][SQL] Detect equijoins better #27309

Closed

Conversation

peter-toth
Copy link
Contributor

@peter-toth peter-toth commented Jan 21, 2020

What changes were proposed in this pull request?

The improvement in this is PR can extract equalities from join conditions so that we can recognise implicit equijoins.

E.g. this example query:

SELECT * FROM t1 FULL OUTER JOIN t2 ON t1.c2 = 2 AND t2.c2 = 2

has the following plan currently:

BroadcastNestedLoopJoin BuildRight, FullOuter, ((c2#226 = 2) AND (c2#237 = 2))
:- *(1) Project [_1#220 AS c#225, _2#221 AS c2#226]
:  +- *(1) LocalTableScan [_1#220, _2#221]
+- BroadcastExchange IdentityBroadcastMode, [id=#146]
   +- *(2) Project [_1#231 AS c#236, _2#232 AS c2#237]
      +- *(2) LocalTableScan [_1#231, _2#232]

But if we take

SELECT * FROM t1 FULL OUTER JOIN t2 ON t1.c2 = 2 AND t2.c2 = 2 AND t1.c2 = t2.c2

where the equality between the sides explicitly defined (t1.c2 = t2.c2) the plan is:

SortMergeJoin [c#225], [c#236], FullOuter, ((c2#226 = 2) AND (c2#237 = 2))
:- *(2) Sort [c#225 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(c#225, 5), true, [id=#101]
:     +- *(1) Project [_1#220 AS c#225, _2#221 AS c2#226]
:        +- *(1) LocalTableScan [_1#220, _2#221]
+- *(4) Sort [c#236 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(c#236, 5), true, [id=#106]
      +- *(3) Project [_1#231 AS c#236, _2#232 AS c2#237]
         +- *(3) LocalTableScan [_1#231, _2#232]

The second plan is better as SMJ doesn't have the broadcast size limitation as BNLJ do.
After this PR the implicit equalities are detected and the first query has the same plan as the second.

Why are the changes needed?

Improve stability.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing and new UTs.

@peter-toth peter-toth force-pushed the SPARK-30598-detect-equijoins-better branch from 8184b1b to ca782e7 Compare January 21, 2020 15:55
@SparkQA
Copy link

SparkQA commented Jan 21, 2020

Test build #117190 has finished for PR 27309 at commit 8184b1b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 21, 2020

Test build #117191 has finished for PR 27309 at commit ca782e7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 21, 2020

Test build #117195 has finished for PR 27309 at commit 6dfd8c0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Jan 22, 2020

This query (and the optimization) is useful for users? Actually, it seems pgsql don't allow it:

postgres=# SELECT * FROM t1 FULL OUTER JOIN t2 ON t1.c2 = 2 AND t2.c2 = 2;
ERROR:  FULL JOIN is only supported with merge-joinable or hash-joinable join conditions

@JoshRosen
Copy link
Contributor

To double-check / restate my own understanding of the example query:

For inner joins:

If we had an inner join of the form

SELECT * FROM t1 JOIN t2 ON t1.c2 = 2 AND t2.c2 = 2

then that's effectively a cross-join (because we'll push each c2 = 2 filter beneath its corresponding side of the join). There's not a whole lot of room to improve our join execution strategy in this case (this PR's changes wouldn't help us).

For full outer joins:

Given

SELECT * FROM t1 FULL OUTER JOIN t2 ON t1.c2 = 2 AND t2.c2 = 2

then the optimization proposed in this PR would cause us to hash partition rows according to c2: the records where c2 = 2 will wind up in a single reducer / partition, while the other (non-join-matching) records would be spread among reducers according to the hash partitioning.

Prior to this patch, our only choice was to plan this query as a broadcast nested loop join (BNLJ): if we exceeded the broadcast size limit then this query would fail. As a result, this PR's changes end up raising the limit on the size of data we can query.

However, I think that this change might slightly regress performance in cases where one side of the join is very small: Spark currently doesn't support broadcast hash join for full outer joins, so queries which previously could fit as broadcast nested loop joins would instead become sort-merge joins.

Sidebar: a multi-pass approach:

I think that it might be possible to rewrite

SELECT * FROM t1 FULL OUTER JOIN t2 ON t1.c2 = 2 AND t2.c2 = 2

as

(SELECT * from t1 CROSS JOIN t2 where t1.c2 = 2 AND t2.c2 = 2)
UNION ALL
(SELECT t1.*, <nulls> from t1 where not (t1.c2 <=> 2))
UNION ALL
(SELECT <nulls>, t2.* from t2 where not (t2.c2 <=> 2))

(note the use of null-safe equals)

That has the advantage of avoiding a shuffle for the non-matching rows at the cost of needing to scan each join input twice (since we don't have a great way to emit multiple output streams from a single task). This could also be helpful in case c2 has skew for values other than 2 (since it avoids hash partitioning on a skew column).

Automatically picking that plan is hard without really good cost-based optimization, though.

For left joins:

AFAIK the potential drawbacks for full outer joins (loss of broadcast join in cases where data is really tiny) don't apply to left joins (since we'd still be able to plan broadcast hash joins), so it seems like this effectively raises the scale limit by giving us an alternative to broadcast nested loop join when the data is very large.

For other join types:

I haven't considered any other join types.

Summary:

For joins where columns of different tables are related via being equal to the same constant value, it looks like this PR's changes give us an alternative to BNLJ in situations where the data is very large.

@peter-toth, do you have a motivating use-case / more realistic example of where this query pattern occurs? My initial feeling is that this seems like a pretty niche optimization and it's not clear whether this occurs often enough in real queries to warrant the added complexity and potential corner-cases.

@JoshRosen JoshRosen added the SQL label Jan 22, 2020
@peter-toth
Copy link
Contributor Author

@maropu I don't know why pgsql doesn't allow it, but Spark SQL does and the query makes sense. IMHO the 2 queries:
SELECT * FROM t1 FULL OUTER JOIN t2 ON t1.c2 = 2 AND t2.c2 = 2
and
SELECT * FROM t1 FULL OUTER JOIN t2 ON t1.c2 = 2 AND t2.c2 = 2 AND t1.c2 = t2.c2' should have the same plan using SMJ to avoid BNLJ.

@JoshRosen thanks for the detailed comment. This is a niche optimization indeed. Let me share why I raised this PR. I have another WIP PR here: #24553 and in the last commit I started playing with enabling constant propagation on join conditions too. I believe it could be beneficial on some niche inner joins e.g. SELECT * FROM t1 JOIN t2 ON t1.c2 = 2 AND t2.c2 = 2 AND t1.c2 = t2.c2' as it could turn BHJ into BNLJ (no need for hashing) or SMJ into CartesianProduct (no need for sorting).
But that PR has a downside as well, it optimizes away the t1.c2 = t2.c2 expression on full outer joins. This PR seemed to be a good idea to solve that issue and also improve full outer join queries where the equality is not specified explicitly.

@peter-toth
Copy link
Contributor Author

@maropu , @JoshRosen I updated the description to reflect where this PR has a benefit. I believe the 2 selects should generate the same plan using SMJ as it is safer and the required change is pretty small. Please let me know if the change makes sense. If you think it doesn't then will I close the PR.

@github-actions
Copy link

github-actions bot commented May 3, 2020

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants