-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add projection to HashJoinExec. #9236
feat: Add projection to HashJoinExec. #9236
Conversation
54f4712
to
a25e272
Compare
26428da
to
768e4e5
Compare
c363a01
to
cd2be59
Compare
The result in my pc is unstable, sometimes it get slower😅. |
@@ -57,12 +57,11 @@ Limit: skip=0, fetch=5 | |||
physical_plan | |||
GlobalLimitExec: skip=0, fetch=5 | |||
--SortPreservingMergeExec: [a@0 ASC NULLS LAST], fetch=5 | |||
----ProjectionExec: expr=[a@1 as a] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typical exmaple here
Nice, could you run/post the |
These are my results for tcph_mem, seems to be a small but reasonable speed up 🚀 :
|
Thanks, @Dandandan. Currently, I don't project equivalence_properties and output_ordering. So some optimizer don't work after embed projection to HashJoinExec. I am trying to handle it. |
Done! I will add more docs tomorrow. |
@metesynnada PTAL |
@@ -217,6 +217,8 @@ fn roundtrip_hash_join() -> Result<()> { | |||
on.clone(), | |||
None, | |||
join_type, | |||
// TODO: add a projectionexec for projection in the join |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please ignore this comment, I will remove it later.
@@ -274,5 +274,5 @@ query PI | |||
SELECT DATE_TRUNC('minute', to_timestamp_seconds("EventTime")) AS M, COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-14' AND "EventDate"::INT::DATE <= '2013-07-15' AND "IsRefresh" = 0 AND "DontCountHits" = 0 GROUP BY DATE_TRUNC('minute', to_timestamp_seconds("EventTime")) ORDER BY DATE_TRUNC('minute', M) LIMIT 10 OFFSET 1000; | |||
---- | |||
|
|||
query |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is changed by cargo test --test sqllogictests -- --complete
automatically.
@@ -645,14 +751,23 @@ impl ExecutionPlan for HashJoinExec { | |||
// over the right that uses this information to issue new batches. | |||
let right_stream = self.right.execute(partition, context)?; | |||
|
|||
// update column indices to reflect the projection | |||
let column_indices_after_projection = match &self.projection { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Project column_indices, so build_batch_from_indices
can skip useless column.
@@ -282,7 +284,7 @@ pub struct HashJoinExec { | |||
pub filter: Option<JoinFilter>, | |||
/// How the join is performed (`OUTER`, `INNER`, etc) | |||
pub join_type: JoinType, | |||
/// The output schema for the join | |||
/// The schema after join |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Add projection
- update
equivalence_properties
andoutput_ordering
after projection - update
column_indices
- keep
HashJoinExec.schema
as the result ofbuild_join_schema
, we can get the finally schema after projection throughschema()
function. So we need to be careful when using it.
Same here -- planning to take a closer look during tomorrow, the idea in general looks good though. Thank you @my-vegetable-has-exploded |
@@ -97,6 +97,8 @@ impl PhysicalOptimizer { | |||
// Note that one should always run this rule after running the EnforceDistribution rule | |||
// as the latter may break local sorting requirements. | |||
Arc::new(EnforceSorting::new()), | |||
// TODO: `try_embed_to_hash_join` in the ProjectionPushdown rule would be block by the CoalesceBatches, so add it before CoalesceBatches. Maybe optimize it in the future. | |||
Arc::new(ProjectionPushdown::new()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to have two ProjectionPushdown
? The original can be removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAICT the original will be modified to account for the new built-in projection capability and this one will be removed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the original will be modified to account for th
Is this refers to #9111?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks good overall. I only have a few comments.
Thank you all for review. @Dandandan @metesynnada @korowa @viirya |
Thank you @my-vegetable-has-exploded ! |
Since datafusion's apache/datafusion#9236, HashJoinExec can also project.
Since datafusion's apache/datafusion#9236, HashJoinExec can also project.
Since datafusion's apache/datafusion#9236, HashJoinExec can also project.
Since datafusion's apache/datafusion#9236, HashJoinExec can also project.
* Upgrading Ballista to datafusion 37.0.0. * Better test debugging information in planner.rs * Updated test logic in planner. Since datafusion's apache/datafusion#9236, HashJoinExec can also project. * cargo fmt * cargo fix * Removed leftover comment * Make cargo clippy happy * lint * Cargo fmt * Fix tpch build * Fix comment spelling * cargo fmt
Which issue does this PR close?
ref #6768
Rationale for this change
Some projection can't be pushed down left input or right input of hash join because
filter
oron
need may need some columns that won't be used in later.By embed those projection to hash join, we can reduce the cost of build_batch_from_indices in hash join (build_batch_from_indices need to can compute::take() for each column) and avoid unecessary output creation.
What changes are included in this PR?
Add a rule
try_embed_to_hash_join
inphysical_optimizer/projection_pushdown.rs
. More related changes are are noted in the comments.Are these changes tested?
Are there any user-facing changes?
None