-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ARROW-12441] [DataFusion] Cross join implementation #11
Conversation
Codecov Report
@@ Coverage Diff @@
## master #11 +/- ##
==========================================
+ Coverage 70.41% 70.53% +0.12%
==========================================
Files 123 124 +1
Lines 21261 21453 +192
==========================================
+ Hits 14970 15132 +162
- Misses 6291 6321 +30
Continue to review full report at Codecov.
|
Amazing! :D Since this will land on the next release, I suggest that we add / migrate the corresponding JIRA issue here, so that we can map issues <> features, bugs, etc. to create a change log. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is looking very cool -- thanks @Dandandan
I think the input handling of the cross join may be reversed from the intention
/// left (build) side which gets loaded in memory | ||
left: Arc<dyn ExecutionPlan>, | ||
/// right (probe) side which are combined with left side | ||
right: Arc<dyn ExecutionPlan>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My reading of the code below is that it is the right side that is loaded into memory and then the left is streamed through.
I think the changes to the planner i this PR are trying to send the smaller input to the left hand input, I think it might make sense to change the implementation to buffer the left side rather than the right side
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
relevant code:
let merge = MergeExec::new(self.left.clone());
left side should be loaded into memory (is the intention, but I am pretty sure we are doing this?), and we try to make the smallest side the left side (as far as possible).
I rewrote master to remove the history of the other languages (and 50MB of history). This means this PR will need to be "rebased" against the current master. The best way I found to do this was to find the relevant commits (via
|
@Dandandan I am happy to rebase this PR against new master. Let me know if that would be helpful |
Looks like this is updated too |
@alamb I updated this and replied with some comments on the locking for the reference to the record batch |
* Initial commit * initial commit * failing test * table scan projection * closer * test passes, with some hacks * use DataFrame (#2) * update README * update dependency * code cleanup (#3) * Add support for Filter operator and BinaryOp expressions (#4) * GitHub action (#5) * Split code into producer and consumer modules (#6) * Support more functions and scalar types (#7) * Use substrait 0.1 and datafusion 8.0 (#8) * use substrait 0.1 * use datafusion 8.0 * update datafusion to 10.0 and substrait to 0.2 (#11) * Add basic join support (#12) * Added fetch support (#23) Added fetch to consumer Added limit to producer Added unit tests for limit Added roundtrip_fill_none() for testing when None input can be converted to 0 Update src/consumer.rs Co-authored-by: Andy Grove <[email protected]> Co-authored-by: Andy Grove <[email protected]> * Upgrade to DataFusion 13.0.0 (#25) * Add sort consumer and producer (#24) Add consumer Add producer and test Modified error string * Add serializer/deserializer (#26) * Add plan and function extension support (#27) * Add plan and function extension support * Removed unwraps * Implement GROUP BY (#28) * Add consumer, producer and tests for aggregate relation Change function extension registration from absolute to relative anchor (reference) Remove operator to/from reference * Fixed function registration bug * Add test * Addressed PR comments * Changed field reference from mask to direct reference (#29) * Changed field reference from masked reference to direct reference * Handle unsupported case (struct with child) * Handle SubqueryAlias (#30) Fixed aggregate function register bug * Add support for SELECT DISTINCT (#31) Add test case * Implement BETWEEN (#32) * Add case (#33) * Implement CASE WHEN * Add more case to test * Addressed comments * feat: support explicit catalog/schema names in ReadRel (#34) * feat: support explicit catalog/schema names in ReadRel Signed-off-by: Ruihang Xia <[email protected]> * fix: use re-exported expr crate Signed-off-by: Ruihang Xia <[email protected]> Signed-off-by: Ruihang Xia <[email protected]> * move files to subfolder * RAT * remove rust.yaml * revert .gitignore changes * tomlfmt * tomlfmt Signed-off-by: Ruihang Xia <[email protected]> Co-authored-by: Daniël Heres <[email protected]> Co-authored-by: JanKaul <[email protected]> Co-authored-by: nseekhao <[email protected]> Co-authored-by: Ruihang Xia <[email protected]>
… `interval` (#11466) * Unparser rule for datatime cast (#10) * use timestamp as the identifier for date64 * rename * implement CustomDialectBuilder * fix * dialect with interval style (#11) --------- Co-authored-by: Phillip LeBlanc <[email protected]> * fmt * clippy * doc * Update datafusion/sql/src/unparser/expr.rs Co-authored-by: Andrew Lamb <[email protected]> * update the doc for CustomDialectBuilder * fix doc test --------- Co-authored-by: Phillip LeBlanc <[email protected]> Co-authored-by: Andrew Lamb <[email protected]>
… `interval` (apache#11466) * Unparser rule for datatime cast (apache#10) * use timestamp as the identifier for date64 * rename * implement CustomDialectBuilder * fix * dialect with interval style (apache#11) --------- Co-authored-by: Phillip LeBlanc <[email protected]> * fmt * clippy * doc * Update datafusion/sql/src/unparser/expr.rs Co-authored-by: Andrew Lamb <[email protected]> * update the doc for CustomDialectBuilder * fix doc test --------- Co-authored-by: Phillip LeBlanc <[email protected]> Co-authored-by: Andrew Lamb <[email protected]>
… `interval` (apache#11466) * Unparser rule for datatime cast (apache#10) * use timestamp as the identifier for date64 * rename * implement CustomDialectBuilder * fix * dialect with interval style (apache#11) --------- Co-authored-by: Phillip LeBlanc <[email protected]> * fmt * clippy * doc * Update datafusion/sql/src/unparser/expr.rs Co-authored-by: Andrew Lamb <[email protected]> * update the doc for CustomDialectBuilder * fix doc test --------- Co-authored-by: Phillip LeBlanc <[email protected]> Co-authored-by: Andrew Lamb <[email protected]>
This is a first (naive, but probably not that bad) implementation of the cross join and CROSS JOIN syntax.
https://issues.apache.org/jira/browse/ARROW-12441
The left side gets loaded into memory and the right side is streamed and gets combined with the left side.
To keep memory usage down, we keep a "cursor" of values on the left side and producing the batches one by one instead of concatenating the result of the full cartesian product.
FYI @andygrove @alamb @jorgecarleitao
This also makes query 9 run in DataFusion (though performance is not OK, but I believe that should be not related to the cross join itself, but is caused by another issue).