From c48d4edcad94126af032bdf11ca9ba84cce2d9f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Berkay=20=C5=9Eahin?= <124376117+berkaysynnada@users.noreply.github.com> Date: Fri, 10 Nov 2023 18:24:51 +0300 Subject: [PATCH] Projection Pushdown (#211) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [GITHUB ACTION] Refactor for license and actions (#148) * Delete datafusion main publication * Adding licence information, refactoring prunibility issues * Update SYNNADA-CONTRIBUTIONS.txt * Update rat_exclude_files.txt * Enhanced Pipeline Execution: Now Supporting Complex Query Plans for Improved Performance (#132) * Very initial test passing algorithm * Working except a minor bug in interval calculations * After clippy * Plan * initial implemantation * Before prune check ability is added. Order equivalence implementations will vanish after we send a seperate PR * minor changes * Fix bug, ordering equivalence random head * minor changes * Add ordering equivalence for sort merge join * Improvement on tests * Upstream changes * Add ordering equivalence for sort merge join * Fmt issues * Update comment * Add ordering equivalence support for hash join * Make 1 file * Code enhancements/comment improvements * Add projection cast handling * Fix output ordering for sort merge join * projection bug fix * Minor changes * minor changes * simplify sort_merge_join * Update equivalence implementation * Update test_utils.rs * Update cast implementation * More idiomatic code * After merge * Comments visisted * Add key swap according to the children orders * Refactoring * After merge refactor * Update sort_enforcement.rs * Update datafusion/core/src/physical_optimizer/join_selection.rs Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> * Comments are applied * Feature/determine prunability (#139) * ready to review * license added * simplifications * simplifications * sort expr's are taken separately for each table * we can return the sort info of the expression now * check filter conditions * simplifications * simplifications * functions are implemented for SortInfo calculations * node specialized tableSide functions * NotImplemented errors are added, test comments are added * Comment change * Simplify comparison node calculations * Simplfications and better commenting * is_prunable function is updated with new Prunability function * Indices of sort expressions are updated with intermediate schema columns of the filter * Unused function is removed * Future-proof index updating * An if let check is removed * simplifications * Simplifications * simplifications * Change if condition * Determine prunability of tables for join operations (#90) * ready to review * license added * simplifications * simplifications * sort expr's are taken separately for each table * we can return the sort info of the expression now * check filter conditions * simplifications * simplifications * functions are implemented for SortInfo calculations * node specialized tableSide functions * NotImplemented errors are added, test comments are added * Comment change * Simplify comparison node calculations * Simplfications and better commenting * is_prunable function is updated with new Prunability function * Indices of sort expressions are updated with intermediate schema columns of the filter * Unused function is removed * Future-proof index updating * An if let check is removed * simplifications * Simplifications * simplifications * Change if condition --------- Co-authored-by: Mustafa Akur Co-authored-by: Mehmet Ozan Kabak * fix the tables' unboundedness --------- Co-authored-by: Mustafa Akur Co-authored-by: Mehmet Ozan Kabak * Comment improvements and minor code improvements * Splitting the order based join selection * Update rat_exclude_files.txt * Revert "Feature/determine prunability (#139)" This reverts commit cf56105021edcf2c2d2efa2f0d3f8febb80b2a0d. * Commented --------- Co-authored-by: Mustafa Akur Co-authored-by: Mehmet Ozan Kabak Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> Co-authored-by: Berkay Şahin <124376117+berkaysynnada@users.noreply.github.com> * Bug fix: Fix lexicographical column search among provided ordering (#156) * License update (#157) This extension adds Synnada license information to the existing one. * Sliding Nested Join Algorithm (#142) * Sliding Hash Join Algorithm (SWHJ) (#147) * Fix errors introduced during rebase * Keep Track of Global Ordering Requirement (#165) * Prunability of Join Filter Physical Expressions (#161) * BinaryExpr Equivalence (#116) * Fix errors introduced during rebase * Support multiple ordered columns on joins and expression graph (#163) * SlidingHashJoin and SlidingNestedLoopJoin planner integration (#171) * Add license, add contribution hash commits, minor changes, add scripts to automate hash generation,Delete docs yaml file * Resolve errors introduced during rebase * Synnada Streaming SQL Tests (#190) * Adds a new method to construct window function for the given input * Protobuf implementations with roundrobin tests (#193) * Protobuf implementations with roundrobin * Proto * Update mod.rs * [GITHUB ACTION] Refactor for license and actions (#148) * Delete datafusion main publication * Adding licence information, refactoring prunibility issues * Update SYNNADA-CONTRIBUTIONS.txt * Update rat_exclude_files.txt * Enhanced Pipeline Execution: Now Supporting Complex Query Plans for Improved Performance (#132) * Very initial test passing algorithm * Working except a minor bug in interval calculations * After clippy * Plan * initial implemantation * Before prune check ability is added. Order equivalence implementations will vanish after we send a seperate PR * minor changes * Fix bug, ordering equivalence random head * minor changes * Add ordering equivalence for sort merge join * Improvement on tests * Upstream changes * Add ordering equivalence for sort merge join * Fmt issues * Update comment * Add ordering equivalence support for hash join * Make 1 file * Code enhancements/comment improvements * Add projection cast handling * Fix output ordering for sort merge join * projection bug fix * Minor changes * minor changes * simplify sort_merge_join * Update equivalence implementation * Update test_utils.rs * Update cast implementation * More idiomatic code * After merge * Comments visisted * Add key swap according to the children orders * Refactoring * After merge refactor * Update sort_enforcement.rs * Update datafusion/core/src/physical_optimizer/join_selection.rs Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> * Comments are applied * Feature/determine prunability (#139) * ready to review * license added * simplifications * simplifications * sort expr's are taken separately for each table * we can return the sort info of the expression now * check filter conditions * simplifications * simplifications * functions are implemented for SortInfo calculations * node specialized tableSide functions * NotImplemented errors are added, test comments are added * Comment change * Simplify comparison node calculations * Simplfications and better commenting * is_prunable function is updated with new Prunability function * Indices of sort expressions are updated with intermediate schema columns of the filter * Unused function is removed * Future-proof index updating * An if let check is removed * simplifications * Simplifications * simplifications * Change if condition * Determine prunability of tables for join operations (#90) * ready to review * license added * simplifications * simplifications * sort expr's are taken separately for each table * we can return the sort info of the expression now * check filter conditions * simplifications * simplifications * functions are implemented for SortInfo calculations * node specialized tableSide functions * NotImplemented errors are added, test comments are added * Comment change * Simplify comparison node calculations * Simplfications and better commenting * is_prunable function is updated with new Prunability function * Indices of sort expressions are updated with intermediate schema columns of the filter * Unused function is removed * Future-proof index updating * An if let check is removed * simplifications * Simplifications * simplifications * Change if condition --------- Co-authored-by: Mustafa Akur Co-authored-by: Mehmet Ozan Kabak * fix the tables' unboundedness --------- Co-authored-by: Mustafa Akur Co-authored-by: Mehmet Ozan Kabak * Comment improvements and minor code improvements * Splitting the order based join selection * Update rat_exclude_files.txt * Revert "Feature/determine prunability (#139)" This reverts commit cf56105021edcf2c2d2efa2f0d3f8febb80b2a0d. * Commented --------- Co-authored-by: Mustafa Akur Co-authored-by: Mehmet Ozan Kabak Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> Co-authored-by: Berkay Şahin <124376117+berkaysynnada@users.noreply.github.com> * Bug fix: Fix lexicographical column search among provided ordering (#156) * License update (#157) This extension adds Synnada license information to the existing one. * Sliding Nested Join Algorithm (#142) * Sliding Hash Join Algorithm (SWHJ) (#147) * Fix errors introduced during rebase * Keep Track of Global Ordering Requirement (#165) * Prunability of Join Filter Physical Expressions (#161) * BinaryExpr Equivalence (#116) * Fix errors introduced during rebase * Support multiple ordered columns on joins and expression graph (#163) * SlidingHashJoin and SlidingNestedLoopJoin planner integration (#171) * Add license, add contribution hash commits, minor changes, add scripts to automate hash generation,Delete docs yaml file * Resolve errors introduced during rebase * Synnada Streaming SQL Tests (#190) * Adds a new method to construct window function for the given input * Protobuf implementations with roundrobin tests (#193) * Protobuf implementations with roundrobin * Proto * Update mod.rs * Fix linter errors, compile errors after rebase, Update commit hashes, regenerate proto * Rewrite Filter Predicate (#192) * Global join selection (#183) * Feature/determine prunability (#139) * ready to review * license added * simplifications * simplifications * sort expr's are taken separately for each table * we can return the sort info of the expression now * check filter conditions * simplifications * simplifications * functions are implemented for SortInfo calculations * node specialized tableSide functions * NotImplemented errors are added, test comments are added * Comment change * Simplify comparison node calculations * Simplfications and better commenting * is_prunable function is updated with new Prunability function * Indices of sort expressions are updated with intermediate schema columns of the filter * Unused function is removed * Future-proof index updating * An if let check is removed * simplifications * Simplifications * simplifications * Change if condition * Determine prunability of tables for join operations (#90) * ready to review * license added * simplifications * simplifications * sort expr's are taken separately for each table * we can return the sort info of the expression now * check filter conditions * simplifications * simplifications * functions are implemented for SortInfo calculations * node specialized tableSide functions * NotImplemented errors are added, test comments are added * Comment change * Simplify comparison node calculations * Simplfications and better commenting * is_prunable function is updated with new Prunability function * Indices of sort expressions are updated with intermediate schema columns of the filter * Unused function is removed * Future-proof index updating * An if let check is removed * simplifications * Simplifications * simplifications * Change if condition --------- Co-authored-by: Mustafa Akur Co-authored-by: Mehmet Ozan Kabak * fix the tables' unboundedness --------- Co-authored-by: Mustafa Akur Co-authored-by: Mehmet Ozan Kabak * [GITHUB ACTION] Refactor for license and actions (#148) * Delete datafusion main publication * Adding licence information, refactoring prunibility issues * Update SYNNADA-CONTRIBUTIONS.txt * Update rat_exclude_files.txt * Enhanced Pipeline Execution: Now Supporting Complex Query Plans for Improved Performance (#132) * Very initial test passing algorithm * Working except a minor bug in interval calculations * After clippy * Plan * initial implemantation * Before prune check ability is added. Order equivalence implementations will vanish after we send a seperate PR * minor changes * Fix bug, ordering equivalence random head * minor changes * Add ordering equivalence for sort merge join * Improvement on tests * Upstream changes * Add ordering equivalence for sort merge join * Fmt issues * Update comment * Add ordering equivalence support for hash join * Make 1 file * Code enhancements/comment improvements * Add projection cast handling * Fix output ordering for sort merge join * projection bug fix * Minor changes * minor changes * simplify sort_merge_join * Update equivalence implementation * Update test_utils.rs * Update cast implementation * More idiomatic code * After merge * Comments visisted * Add key swap according to the children orders * Refactoring * After merge refactor * Update sort_enforcement.rs * Update datafusion/core/src/physical_optimizer/join_selection.rs Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> * Comments are applied * Feature/determine prunability (#139) * ready to review * license added * simplifications * simplifications * sort expr's are taken separately for each table * we can return the sort info of the expression now * check filter conditions * simplifications * simplifications * functions are implemented for SortInfo calculations * node specialized tableSide functions * NotImplemented errors are added, test comments are added * Comment change * Simplify comparison node calculations * Simplfications and better commenting * is_prunable function is updated with new Prunability function * Indices of sort expressions are updated with intermediate schema columns of the filter * Unused function is removed * Future-proof index updating * An if let check is removed * simplifications * Simplifications * simplifications * Change if condition * Determine prunability of tables for join operations (#90) * ready to review * license added * simplifications * simplifications * sort expr's are taken separately for each table * we can return the sort info of the expression now * check filter conditions * simplifications * simplifications * functions are implemented for SortInfo calculations * node specialized tableSide functions * NotImplemented errors are added, test comments are added * Comment change * Simplify comparison node calculations * Simplfications and better commenting * is_prunable function is updated with new Prunability function * Indices of sort expressions are updated with intermediate schema columns of the filter * Unused function is removed * Future-proof index updating * An if let check is removed * simplifications * Simplifications * simplifications * Change if condition --------- Co-authored-by: Mustafa Akur Co-authored-by: Mehmet Ozan Kabak * fix the tables' unboundedness --------- Co-authored-by: Mustafa Akur Co-authored-by: Mehmet Ozan Kabak * Comment improvements and minor code improvements * Splitting the order based join selection * Update rat_exclude_files.txt * Revert "Feature/determine prunability (#139)" This reverts commit cf56105021edcf2c2d2efa2f0d3f8febb80b2a0d. * Commented --------- Co-authored-by: Mustafa Akur Co-authored-by: Mehmet Ozan Kabak Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> Co-authored-by: Berkay Şahin <124376117+berkaysynnada@users.noreply.github.com> * Bug fix: Fix lexicographical column search among provided ordering (#156) * Initial comments on working * License update (#157) This extension adds Synnada license information to the existing one. * Adding comments * Update sort_hash_join.rs * After merge silent error * Change the query in HashJoin * Revert "Feature/determine prunability (#139)" This reverts commit cf56105021edcf2c2d2efa2f0d3f8febb80b2a0d. * Update rat_exclude_files.txt * Clippy solving. * [GITHUB ACTION] Refactor for license and actions (#148) * Delete datafusion main publication * Adding licence information, refactoring prunibility issues * Update SYNNADA-CONTRIBUTIONS.txt * Update rat_exclude_files.txt * Enhanced Pipeline Execution: Now Supporting Complex Query Plans for Improved Performance (#132) * Very initial test passing algorithm * Working except a minor bug in interval calculations * After clippy * Plan * initial implemantation * Before prune check ability is added. Order equivalence implementations will vanish after we send a seperate PR * minor changes * Fix bug, ordering equivalence random head * minor changes * Add ordering equivalence for sort merge join * Improvement on tests * Upstream changes * Add ordering equivalence for sort merge join * Fmt issues * Update comment * Add ordering equivalence support for hash join * Make 1 file * Code enhancements/comment improvements * Add projection cast handling * Fix output ordering for sort merge join * projection bug fix * Minor changes * minor changes * simplify sort_merge_join * Update equivalence implementation * Update test_utils.rs * Update cast implementation * More idiomatic code * After merge * Comments visisted * Add key swap according to the children orders * Refactoring * After merge refactor * Update sort_enforcement.rs * Update datafusion/core/src/physical_optimizer/join_selection.rs Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> * Comments are applied * Feature/determine prunability (#139) * ready to review * license added * simplifications * simplifications * sort expr's are taken separately for each table * we can return the sort info of the expression now * check filter conditions * simplifications * simplifications * functions are implemented for SortInfo calculations * node specialized tableSide functions * NotImplemented errors are added, test comments are added * Comment change * Simplify comparison node calculations * Simplfications and better commenting * is_prunable function is updated with new Prunability function * Indices of sort expressions are updated with intermediate schema columns of the filter * Unused function is removed * Future-proof index updating * An if let check is removed * simplifications * Simplifications * simplifications * Change if condition * Determine prunability of tables for join operations (#90) * ready to review * license added * simplifications * simplifications * sort expr's are taken separately for each table * we can return the sort info of the expression now * check filter conditions * simplifications * simplifications * functions are implemented for SortInfo calculations * node specialized tableSide functions * NotImplemented errors are added, test comments are added * Comment change * Simplify comparison node calculations * Simplfications and better commenting * is_prunable function is updated with new Prunability function * Indices of sort expressions are updated with intermediate schema columns of the filter * Unused function is removed * Future-proof index updating * An if let check is removed * simplifications * Simplifications * simplifications * Change if condition --------- Co-authored-by: Mustafa Akur Co-authored-by: Mehmet Ozan Kabak * fix the tables' unboundedness --------- Co-authored-by: Mustafa Akur Co-authored-by: Mehmet Ozan Kabak * Comment improvements and minor code improvements * Splitting the order based join selection * Update rat_exclude_files.txt * Revert "Feature/determine prunability (#139)" This reverts commit cf56105021edcf2c2d2efa2f0d3f8febb80b2a0d. * Commented --------- Co-authored-by: Mustafa Akur Co-authored-by: Mehmet Ozan Kabak Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> Co-authored-by: Berkay Şahin <124376117+berkaysynnada@users.noreply.github.com> * Bug fix: Fix lexicographical column search among provided ordering (#156) * License update (#157) This extension adds Synnada license information to the existing one. * [GITHUB ACTION] Refactor for license and actions (#148) * Delete datafusion main publication * Adding licence information, refactoring prunibility issues * Update SYNNADA-CONTRIBUTIONS.txt * Update rat_exclude_files.txt * Enhanced Pipeline Execution: Now Supporting Complex Query Plans for Improved Performance (#132) * Very initial test passing algorithm * Working except a minor bug in interval calculations * After clippy * Plan * initial implemantation * Before prune check ability is added. Order equivalence implementations will vanish after we send a seperate PR * minor changes * Fix bug, ordering equivalence random head * minor changes * Add ordering equivalence for sort merge join * Improvement on tests * Upstream changes * Add ordering equivalence for sort merge join * Fmt issues * Update comment * Add ordering equivalence support for hash join * Make 1 file * Code enhancements/comment improvements * Add projection cast handling * Fix output ordering for sort merge join * projection bug fix * Minor changes * minor changes * simplify sort_merge_join * Update equivalence implementation * Update test_utils.rs * Update cast implementation * More idiomatic code * After merge * Comments visisted * Add key swap according to the children orders * Refactoring * After merge refactor * Update sort_enforcement.rs * Update datafusion/core/src/physical_optimizer/join_selection.rs Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> * Comments are applied * Feature/determine prunability (#139) * ready to review * license added * simplifications * simplifications * sort expr's are taken separately for each table * we can return the sort info of the expression now * check filter conditions * simplifications * simplifications * functions are implemented for SortInfo calculations * node specialized tableSide functions * NotImplemented errors are added, test comments are added * Comment change * Simplify comparison node calculations * Simplfications and better commenting * is_prunable function is updated with new Prunability function * Indices of sort expressions are updated with intermediate schema columns of the filter * Unused function is removed * Future-proof index updating * An if let check is removed * simplifications * Simplifications * simplifications * Change if condition * Determine prunability of tables for join operations (#90) * ready to review * license added * simplifications * simplifications * sort expr's are taken separately for each table * we can return the sort info of the expression now * check filter conditions * simplifications * simplifications * functions are implemented for SortInfo calculations * node specialized tableSide functions * NotImplemented errors are added, test comments are added * Comment change * Simplify comparison node calculations * Simplfications and better commenting * is_prunable function is updated with new Prunability function * Indices of sort expressions are updated with intermediate schema columns of the filter * Unused function is removed * Future-proof index updating * An if let check is removed * simplifications * Simplifications * simplifications * Change if condition --------- Co-authored-by: Mustafa Akur Co-authored-by: Mehmet Ozan Kabak * fix the tables' unboundedness --------- Co-authored-by: Mustafa Akur Co-authored-by: Mehmet Ozan Kabak * Comment improvements and minor code improvements * Splitting the order based join selection * Update rat_exclude_files.txt * Revert "Feature/determine prunability (#139)" This reverts commit cf56105021edcf2c2d2efa2f0d3f8febb80b2a0d. * Commented --------- Co-authored-by: Mustafa Akur Co-authored-by: Mehmet Ozan Kabak Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> Co-authored-by: Berkay Şahin <124376117+berkaysynnada@users.noreply.github.com> * Bug fix: Fix lexicographical column search among provided ordering (#156) * License update (#157) This extension adds Synnada license information to the existing one. * Sliding Nested Join Algorithm (#142) * Sliding Hash Join Algorithm (SWHJ) (#147) * Fix errors introduced during rebase * [GITHUB ACTION] Refactor for license and actions (#148) * Delete datafusion main publication * Adding licence information, refactoring prunibility issues * Update SYNNADA-CONTRIBUTIONS.txt * Update rat_exclude_files.txt * Enhanced Pipeline Execution: Now Supporting Complex Query Plans for Improved Performance (#132) * Very initial test passing algorithm * Working except a minor bug in interval calculations * After clippy * Plan * initial implemantation * Before prune check ability is added. Order equivalence implementations will vanish after we send a seperate PR * minor changes * Fix bug, ordering equivalence random head * minor changes * Add ordering equivalence for sort merge join * Improvement on tests * Upstream changes * Add ordering equivalence for sort merge join * Fmt issues * Update comment * Add ordering equivalence support for hash join * Make 1 file * Code enhancements/comment improvements * Add projection cast handling * Fix output ordering for sort merge join * projection bug fix * Minor changes * minor changes * simplify sort_merge_join * Update equivalence implementation * Update test_utils.rs * Update cast implementation * More idiomatic code * After merge * Comments visisted * Add key swap according to the children orders * Refactoring * After merge refactor * Update sort_enforcement.rs * Update datafusion/core/src/physical_optimizer/join_selection.rs Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> * Comments are applied * Feature/determine prunability (#139) * ready to review * license added * simplifications * simplifications * sort expr's are taken separately for each table * we can return the sort info of the expression now * check filter conditions * simplifications * simplifications * functions are implemented for SortInfo calculations * node specialized tableSide functions * NotImplemented errors are added, test comments are added * Comment change * Simplify comparison node calculations * Simplfications and better commenting * is_prunable function is updated with new Prunability function * Indices of sort expressions are updated with intermediate schema columns of the filter * Unused function is removed * Future-proof index updating * An if let check is removed * simplifications * Simplifications * simplifications * Change if condition * Determine prunability of tables for join operations (#90) * ready to review * license added * simplifications * simplifications * sort expr's are taken separately for each table * we can return the sort info of the expression now * check filter conditions * simplifications * simplifications * functions are implemented for SortInfo calculations * node specialized tableSide functions * NotImplemented errors are added, test comments are added * Comment change * Simplify comparison node calculations * Simplfications and better commenting * is_prunable function is updated with new Prunability function * Indices of sort expressions are updated with intermediate schema columns of the filter * Unused function is removed * Future-proof index updating * An if let check is removed * simplifications * Simplifications * simplifications * Change if condition --------- Co-authored-by: Mustafa Akur Co-authored-by: Mehmet Ozan Kabak * fix the tables' unboundedness --------- Co-authored-by: Mustafa Akur Co-authored-by: Mehmet Ozan Kabak * Comment improvements and minor code improvements * Splitting the order based join selection * Update rat_exclude_files.txt * Revert "Feature/determine prunability (#139)" This reverts commit cf56105021edcf2c2d2efa2f0d3f8febb80b2a0d. * Commented --------- Co-authored-by: Mustafa Akur Co-authored-by: Mehmet Ozan Kabak Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> Co-authored-by: Berkay Şahin <124376117+berkaysynnada@users.noreply.github.com> * Bug fix: Fix lexicographical column search among provided ordering (#156) * License update (#157) This extension adds Synnada license information to the existing one. * Sliding Nested Join Algorithm (#142) * Sliding Hash Join Algorithm (SWHJ) (#147) * Fix errors introduced during rebase * Keep Track of Global Ordering Requirement (#165) * Prunability of Join Filter Physical Expressions (#161) * BinaryExpr Equivalence (#116) * Fix errors introduced during rebase * Support multiple ordered columns on joins and expression graph (#163) * After merge * SlidingHashJoin and SlidingNestedLoopJoin planner integration (#171) * Before clippy fmt etc. * lazy loading tables * mini test * [GITHUB ACTION] Refactor for license and actions (#148) * Delete datafusion main publication * Adding licence information, refactoring prunibility issues * Update SYNNADA-CONTRIBUTIONS.txt * Update rat_exclude_files.txt * Enhanced Pipeline Execution: Now Supporting Complex Query Plans for Improved Performance (#132) * Very initial test passing algorithm * Working except a minor bug in interval calculations * After clippy * Plan * initial implemantation * Before prune check ability is added. Order equivalence implementations will vanish after we send a seperate PR * minor changes * Fix bug, ordering equivalence random head * minor changes * Add ordering equivalence for sort merge join * Improvement on tests * Upstream changes * Add ordering equivalence for sort merge join * Fmt issues * Update comment * Add ordering equivalence support for hash join * Make 1 file * Code enhancements/comment improvements * Add projection cast handling * Fix output ordering for sort merge join * projection bug fix * Minor changes * minor changes * simplify sort_merge_join * Update equivalence implementation * Update test_utils.rs * Update cast implementation * More idiomatic code * After merge * Comments visisted * Add key swap according to the children orders * Refactoring * After merge refactor * Update sort_enforcement.rs * Update datafusion/core/src/physical_optimizer/join_selection.rs Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> * Comments are applied * Feature/determine prunability (#139) * ready to review * license added * simplifications * simplifications * sort expr's are taken separately for each table * we can return the sort info of the expression now * check filter conditions * simplifications * simplifications * functions are implemented for SortInfo calculations * node specialized tableSide functions * NotImplemented errors are added, test comments are added * Comment change * Simplify comparison node calculations * Simplfications and better commenting * is_prunable function is updated with new Prunability function * Indices of sort expressions are updated with intermediate schema columns of the filter * Unused function is removed * Future-proof index updating * An if let check is removed * simplifications * Simplifications * simplifications * Change if condition * Determine prunability of tables for join operations (#90) * ready to review * license added * simplifications * simplifications * sort expr's are taken separately for each table * we can return the sort info of the expression now * check filter conditions * simplifications * simplifications * functions are implemented for SortInfo calculations * node specialized tableSide functions * NotImplemented errors are added, test comments are added * Comment change * Simplify comparison node calculations * Simplfications and better commenting * is_prunable function is updated with new Prunability function * Indices of sort expressions are updated with intermediate schema columns of the filter * Unused function is removed * Future-proof index updating * An if let check is removed * simplifications * Simplifications * simplifications * Change if condition --------- Co-authored-by: Mustafa Akur Co-authored-by: Mehmet Ozan Kabak * fix the tables' unboundedness --------- Co-authored-by: Mustafa Akur Co-authored-by: Mehmet Ozan Kabak * Comment improvements and minor code improvements * Splitting the order based join selection * Update rat_exclude_files.txt * Revert "Feature/determine prunability (#139)" This reverts commit cf56105021edcf2c2d2efa2f0d3f8febb80b2a0d. * Commented --------- Co-authored-by: Mustafa Akur Co-authored-by: Mehmet Ozan Kabak Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> Co-authored-by: Berkay Şahin <124376117+berkaysynnada@users.noreply.github.com> * Bug fix: Fix lexicographical column search among provided ordering (#156) * License update (#157) This extension adds Synnada license information to the existing one. * Sliding Nested Join Algorithm (#142) * Sliding Hash Join Algorithm (SWHJ) (#147) * Fix errors introduced during rebase * Keep Track of Global Ordering Requirement (#165) * Prunability of Join Filter Physical Expressions (#161) * BinaryExpr Equivalence (#116) * Fix errors introduced during rebase * Support multiple ordered columns on joins and expression graph (#163) * SlidingHashJoin and SlidingNestedLoopJoin planner integration (#171) * Add license, add contribution hash commits, minor changes * Before rebase merge * Add license, add contribution hash commits, minor changes, add scripts to automate hash generation,Delete docs yaml file * Update utils.rs * Print deletion * Update Cargo.lock * Refactor for review * Working without slt * [GITHUB ACTION] Refactor for license and actions (#148) * Delete datafusion main publication * Adding licence information, refactoring prunibility issues * Update SYNNADA-CONTRIBUTIONS.txt * Update rat_exclude_files.txt * Enhanced Pipeline Execution: Now Supporting Complex Query Plans for Improved Performance (#132) * Very initial test passing algorithm * Working except a minor bug in interval calculations * After clippy * Plan * initial implemantation * Before prune check ability is added. Order equivalence implementations will vanish after we send a seperate PR * minor changes * Fix bug, ordering equivalence random head * minor changes * Add ordering equivalence for sort merge join * Improvement on tests * Upstream changes * Add ordering equivalence for sort merge join * Fmt issues * Update comment * Add ordering equivalence support for hash join * Make 1 file * Code enhancements/comment improvements * Add projection cast handling * Fix output ordering for sort merge join * projection bug fix * Minor changes * minor changes * simplify sort_merge_join * Update equivalence implementation * Update test_utils.rs * Update cast implementation * More idiomatic code * After merge * Comments visisted * Add key swap according to the children orders * Refactoring * After merge refactor * Update sort_enforcement.rs * Update datafusion/core/src/physical_optimizer/join_selection.rs Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> * Comments are applied * Feature/determine prunability (#139) * ready to review * license added * simplifications * simplifications * sort expr's are taken separately for each table * we can return the sort info of the expression now * check filter conditions * simplifications * simplifications * functions are implemented for SortInfo calculations * node specialized tableSide functions * NotImplemented errors are added, test comments are added * Comment change * Simplify comparison node calculations * Simplfications and better commenting * is_prunable function is updated with new Prunability function * Indices of sort expressions are updated with intermediate schema columns of the filter * Unused function is removed * Future-proof index updating * An if let check is removed * simplifications * Simplifications * simplifications * Change if condition * Determine prunability of tables for join operations (#90) * ready to review * license added * simplifications * simplifications * sort expr's are taken separately for each table * we can return the sort info of the expression now * check filter conditions * simplifications * simplifications * functions are implemented for SortInfo calculations * node specialized tableSide functions * NotImplemented errors are added, test comments are added * Comment change * Simplify comparison node calculations * Simplfications and better commenting * is_prunable function is updated with new Prunability function * Indices of sort expressions are updated with intermediate schema columns of the filter * Unused function is removed * Future-proof index updating * An if let check is removed * simplifications * Simplifications * simplifications * Change if condition --------- Co-authored-by: Mustafa Akur Co-authored-by: Mehmet Ozan Kabak * fix the tables' unboundedness --------- Co-authored-by: Mustafa Akur Co-authored-by: Mehmet Ozan Kabak * Comment improvements and minor code improvements * Splitting the order based join selection * Update rat_exclude_files.txt * Revert "Feature/determine prunability (#139)" This reverts commit cf56105021edcf2c2d2efa2f0d3f8febb80b2a0d. * Commented --------- Co-authored-by: Mustafa Akur Co-authored-by: Mehmet Ozan Kabak Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> Co-authored-by: Berkay Şahin <124376117+berkaysynnada@users.noreply.github.com> * Bug fix: Fix lexicographical column search among provided ordering (#156) * License update (#157) This extension adds Synnada license information to the existing one. * Sliding Nested Join Algorithm (#142) * Sliding Hash Join Algorithm (SWHJ) (#147) * Fix errors introduced during rebase * Keep Track of Global Ordering Requirement (#165) * Prunability of Join Filter Physical Expressions (#161) * BinaryExpr Equivalence (#116) * Fix errors introduced during rebase * Support multiple ordered columns on joins and expression graph (#163) * SlidingHashJoin and SlidingNestedLoopJoin planner integration (#171) * Add license, add contribution hash commits, minor changes, add scripts to automate hash generation,Delete docs yaml file * Change in test folders * Update join_pipeline_selection.rs * Update utils.rs * Before clippy * Before SLT * Tests are passing and clippy OK. * [GITHUB ACTION] Refactor for license and actions (#148) * Delete datafusion main publication * Adding licence information, refactoring prunibility issues * Update SYNNADA-CONTRIBUTIONS.txt * Update rat_exclude_files.txt * Enhanced Pipeline Execution: Now Supporting Complex Query Plans for Improved Performance (#132) * Very initial test passing algorithm * Working except a minor bug in interval calculations * After clippy * Plan * initial implemantation * Before prune check ability is added. Order equivalence implementations will vanish after we send a seperate PR * minor changes * Fix bug, ordering equivalence random head * minor changes * Add ordering equivalence for sort merge join * Improvement on tests * Upstream changes * Add ordering equivalence for sort merge join * Fmt issues * Update comment * Add ordering equivalence support for hash join * Make 1 file * Code enhancements/comment improvements * Add projection cast handling * Fix output ordering for sort merge join * projection bug fix * Minor changes * minor changes * simplify sort_merge_join * Update equivalence implementation * Update test_utils.rs * Update cast implementation * More idiomatic code * After merge * Comments visisted * Add key swap according to the children orders * Refactoring * After merge refactor * Update sort_enforcement.rs * Update datafusion/core/src/physical_optimizer/join_selection.rs Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> * Comments are applied * Feature/determine prunability (#139) * ready to review * license added * simplifications * simplifications * sort expr's are taken separately for each table * we can return the sort info of the expression now * check filter conditions * simplifications * simplifications * functions are implemented for SortInfo calculations * node specialized tableSide functions * NotImplemented errors are added, test comments are added * Comment change * Simplify comparison node calculations * Simplfications and better commenting * is_prunable function is updated with new Prunability function * Indices of sort expressions are updated with intermediate schema columns of the filter * Unused function is removed * Future-proof index updating * An if let check is removed * simplifications * Simplifications * simplifications * Change if condition * Determine prunability of tables for join operations (#90) * ready to review * license added * simplifications * simplifications * sort expr's are taken separately for each table * we can return the sort info of the expression now * check filter conditions * simplifications * simplifications * functions are implemented for SortInfo calculations * node specialized tableSide functions * NotImplemented errors are added, test comments are added * Comment change * Simplify comparison node calculations * Simplfications and better commenting * is_prunable function is updated with new Prunability function * Indices of sort expressions are updated with intermediate schema columns of the filter * Unused function is removed * Future-proof index updating * An if let check is removed * simplifications * Simplifications * simplifications * Change if condition --------- Co-authored-by: Mustafa Akur Co-authored-by: Mehmet Ozan Kabak * fix the tables' unboundedness --------- Co-authored-by: Mustafa Akur Co-authored-by: Mehmet Ozan Kabak * Comment improvements and minor code improvements * Splitting the order based join selection * Update rat_exclude_files.txt * Revert "Feature/determine prunability (#139)" This reverts commit cf56105021edcf2c2d2efa2f0d3f8febb80b2a0d. * Commented --------- Co-authored-by: Mustafa Akur Co-authored-by: Mehmet Ozan Kabak Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> Co-authored-by: Berkay Şahin <124376117+berkaysynnada@users.noreply.github.com> * Bug fix: Fix lexicographical column search among provided ordering (#156) * License update (#157) This extension adds Synnada license information to the existing one. * Sliding Nested Join Algorithm (#142) * Sliding Hash Join Algorithm (SWHJ) (#147) * Fix errors introduced during rebase * Keep Track of Global Ordering Requirement (#165) * Prunability of Join Filter Physical Expressions (#161) * BinaryExpr Equivalence (#116) * Fix errors introduced during rebase * Support multiple ordered columns on joins and expression graph (#163) * SlidingHashJoin and SlidingNestedLoopJoin planner integration (#171) * Add license, add contribution hash commits, minor changes, add scripts to automate hash generation,Delete docs yaml file * Resolve errors introduced during rebase * After merge * Update rat_exclude_files.txt * Comments visited * Synnada Streaming SQL Tests (#190) * Adds a new method to construct window function for the given input * For mustafa * Final * Update rat_exclude_files.txt * More commenting * Fix linter errors, compile errors after rebase, Update commit hashes * After merge refactors * Dir * Additional test for coverage * Update join_disable_repartition_joins.slt * Review changes, remove code duplicates * Update subdirectory hashes --------- Co-authored-by: Berkay Şahin <124376117+berkaysynnada@users.noreply.github.com> Co-authored-by: Mustafa Akur Co-authored-by: Mehmet Ozan Kabak Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> * work in progress * fix after merge * joins are ok * code cleaning * CaseExpr handling * tests are updated * Handling sequential projections * Simplifications * partition expr update * ProjectionPushdown becomes a rule.bash de * fix after merge * remove the subrule * tpch update * Minor comment changes * remove unnecessary state struct * coalesce batches does not let to push projection down * tpch changes removed * minor changes * addresses reviews * Update projection_pushdown.rs * minor * Review Part 1 * Simplify the API of plan handlers * Review Part 2 * Review Part 3 * Review Part 4 * Review Part 5 * Review Part 6 * Review Part 7 * Remove duplication of physical_expr matching * fix documentation * Minor changes * Take upstream changes --------- Co-authored-by: Metehan Yıldırım <100111937+metesynnada@users.noreply.github.com> Co-authored-by: Mustafa Akur Co-authored-by: Mehmet Ozan Kabak Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> --- .../join_pipeline_selection.rs | 102 ++++++++-------- datafusion/core/src/physical_optimizer/mod.rs | 9 +- .../physical_optimizer/pipeline_checker.rs | 2 +- .../physical_optimizer/projection_pushdown.rs | 114 +++++++++++++++++- datafusion/expr/src/signature.rs | 2 +- .../physical-expr/src/expressions/negative.rs | 1 + datafusion/physical-expr/src/functions.rs | 17 +-- .../sqllogictest/test_files/explain.slt | 1 - .../join_disable_repartition_joins.slt | 35 +++--- datafusion/sqllogictest/test_files/stream.slt | 63 +++++----- dev/release/rat_exclude_files.txt | 1 + 11 files changed, 217 insertions(+), 130 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/join_pipeline_selection.rs b/datafusion/core/src/physical_optimizer/join_pipeline_selection.rs index f5b5e43e9066..683922056d14 100644 --- a/datafusion/core/src/physical_optimizer/join_pipeline_selection.rs +++ b/datafusion/core/src/physical_optimizer/join_pipeline_selection.rs @@ -4161,21 +4161,19 @@ mod sql_fuzzy_tests { let expected_plan = [ "ProjectionExec: expr=[o_orderkey@0 as o_orderkey, LAST_VALUE(lineitem.l_suppkey) ORDER BY [lineitem.l_orderkey ASC NULLS LAST]@1 as amount_usd]", " AggregateExec: mode=Single, gby=[o_orderkey@0 as o_orderkey], aggr=[LAST_VALUE(lineitem.l_suppkey)], ordering_mode=FullyOrdered", - " ProjectionExec: expr=[o_orderkey@0 as o_orderkey, l_orderkey@2 as l_orderkey, l_suppkey@3 as l_suppkey]", - " ProjectionExec: expr=[o_orderkey@3 as o_orderkey, o_orderdate@4 as o_orderdate, l_orderkey@0 as l_orderkey, l_suppkey@1 as l_suppkey, l_shipdate@2 as l_shipdate]", - " PartitionedHashJoinExec: join_type=Inner, on=[(l_shipdate@2, o_orderdate@1)], filter=l_orderkey@1 < o_orderkey@0 - 10", - " ProjectionExec: expr=[l_orderkey@0 as l_orderkey, l_suppkey@1 as l_suppkey, l_shipdate@3 as l_shipdate]", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: l_returnflag@2 = R", - " CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/lineitem.csv]]}, projection=[l_orderkey, l_suppkey, l_returnflag, l_shipdate], infinite_source=true, output_ordering=[l_orderkey@0 ASC NULLS LAST], has_header=true", - " ProjectionExec: expr=[o_orderkey@1 as o_orderkey, o_orderdate@2 as o_orderdate]", - " SortMergeJoin: join_type=Inner, on=[(c_custkey@0, o_orderkey@0)]", - " ProjectionExec: expr=[c_custkey@0 as c_custkey]", - " ProjectionExec: expr=[c_custkey@2 as c_custkey, c_nationkey@3 as c_nationkey, n_nationkey@0 as n_nationkey, n_regionkey@1 as n_regionkey]", - " SlidingHashJoinExec: join_type=Inner, on=[(n_regionkey@1, c_nationkey@1)], filter=n_nationkey@1 > c_custkey@0 AND n_nationkey@1 < c_custkey@0 + 20", - " CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/nation.csv]]}, projection=[n_nationkey, n_regionkey], infinite_source=true, output_ordering=[n_nationkey@0 ASC NULLS LAST], has_header=true", - " CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/customer.csv]]}, projection=[c_custkey, c_nationkey], infinite_source=true, output_ordering=[c_custkey@0 ASC NULLS LAST], has_header=true", - " CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/orders.csv]]}, projection=[o_orderkey, o_orderdate], infinite_source=true, output_ordering=[o_orderkey@0 ASC NULLS LAST], has_header=true", + " ProjectionExec: expr=[o_orderkey@3 as o_orderkey, l_orderkey@0 as l_orderkey, l_suppkey@1 as l_suppkey]", + " PartitionedHashJoinExec: join_type=Inner, on=[(l_shipdate@2, o_orderdate@1)], filter=l_orderkey@1 < o_orderkey@0 - 10", + " ProjectionExec: expr=[l_orderkey@0 as l_orderkey, l_suppkey@1 as l_suppkey, l_shipdate@3 as l_shipdate]", + " CoalesceBatchesExec: target_batch_size=8192", + " FilterExec: l_returnflag@2 = R", + " CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/lineitem.csv]]}, projection=[l_orderkey, l_suppkey, l_returnflag, l_shipdate], infinite_source=true, output_ordering=[l_orderkey@0 ASC NULLS LAST], has_header=true", + " ProjectionExec: expr=[o_orderkey@1 as o_orderkey, o_orderdate@2 as o_orderdate]", + " SortMergeJoin: join_type=Inner, on=[(c_custkey@0, o_orderkey@0)]", + " ProjectionExec: expr=[c_custkey@2 as c_custkey]", + " SlidingHashJoinExec: join_type=Inner, on=[(n_regionkey@1, c_nationkey@1)], filter=n_nationkey@1 > c_custkey@0 AND n_nationkey@1 < c_custkey@0 + 20", + " CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/nation.csv]]}, projection=[n_nationkey, n_regionkey], infinite_source=true, output_ordering=[n_nationkey@0 ASC NULLS LAST], has_header=true", + " CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/customer.csv]]}, projection=[c_custkey, c_nationkey], infinite_source=true, output_ordering=[c_custkey@0 ASC NULLS LAST], has_header=true", + " CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/orders.csv]]}, projection=[o_orderkey, o_orderdate], infinite_source=true, output_ordering=[o_orderkey@0 ASC NULLS LAST], has_header=true", ]; experiment(&expected_plan, sql).await?; @@ -4207,11 +4205,10 @@ mod sql_fuzzy_tests { " PartitionedHashJoinExec: join_type=Inner, on=[(c_nationkey@1, n_regionkey@1)], filter=n_nationkey@1 > c_custkey@0", " ProjectionExec: expr=[c_custkey@1 as c_custkey, c_nationkey@2 as c_nationkey]", " SortMergeJoin: join_type=Inner, on=[(o_orderkey@0, c_custkey@0)]", - " ProjectionExec: expr=[o_orderkey@0 as o_orderkey]", - " ProjectionExec: expr=[o_orderkey@2 as o_orderkey, o_orderdate@3 as o_orderdate, l_orderkey@0 as l_orderkey, l_shipdate@1 as l_shipdate]", - " SlidingHashJoinExec: join_type=Inner, on=[(l_shipdate@1, o_orderdate@1)], filter=l_orderkey@1 < o_orderkey@0 - 10 AND l_orderkey@1 > o_orderkey@0 + 10", - " CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/lineitem.csv]]}, projection=[l_orderkey, l_shipdate], infinite_source=true, output_ordering=[l_orderkey@0 ASC NULLS LAST], has_header=true", - " CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/orders.csv]]}, projection=[o_orderkey, o_orderdate], infinite_source=true, output_ordering=[o_orderkey@0 ASC NULLS LAST], has_header=true", + " ProjectionExec: expr=[o_orderkey@2 as o_orderkey]", + " SlidingHashJoinExec: join_type=Inner, on=[(l_shipdate@1, o_orderdate@1)], filter=l_orderkey@1 < o_orderkey@0 - 10 AND l_orderkey@1 > o_orderkey@0 + 10", + " CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/lineitem.csv]]}, projection=[l_orderkey, l_shipdate], infinite_source=true, output_ordering=[l_orderkey@0 ASC NULLS LAST], has_header=true", + " CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/orders.csv]]}, projection=[o_orderkey, o_orderdate], infinite_source=true, output_ordering=[o_orderkey@0 ASC NULLS LAST], has_header=true", " CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/customer.csv]]}, projection=[c_custkey, c_nationkey], infinite_source=true, output_ordering=[c_custkey@0 ASC NULLS LAST], has_header=true", " CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/nation.csv]]}, projection=[n_nationkey, n_regionkey], infinite_source=true, output_ordering=[n_nationkey@0 ASC NULLS LAST], has_header=true", ]; @@ -4254,11 +4251,10 @@ mod sql_fuzzy_tests { " PartitionedHashJoinExec: join_type=Inner, on=[(c_nationkey@1, n_regionkey@1)], filter=n_nationkey@1 > c_custkey@0", " ProjectionExec: expr=[c_custkey@1 as c_custkey, c_nationkey@2 as c_nationkey]", " SortMergeJoin: join_type=Inner, on=[(o_orderkey@0, c_custkey@0)]", - " ProjectionExec: expr=[o_orderkey@0 as o_orderkey]", - " ProjectionExec: expr=[o_orderkey@2 as o_orderkey, o_orderdate@3 as o_orderdate, l_orderkey@0 as l_orderkey, l_shipdate@1 as l_shipdate]", - " SlidingHashJoinExec: join_type=Inner, on=[(l_shipdate@1, o_orderdate@1)], filter=l_orderkey@1 < o_orderkey@0 - 10 AND l_orderkey@1 > o_orderkey@0 + 10", - " CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/lineitem.csv]]}, projection=[l_orderkey, l_shipdate], infinite_source=true, output_ordering=[l_orderkey@0 ASC NULLS LAST], has_header=true", - " CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/orders.csv]]}, projection=[o_orderkey, o_orderdate], infinite_source=true, output_ordering=[o_orderkey@0 ASC NULLS LAST], has_header=true", + " ProjectionExec: expr=[o_orderkey@2 as o_orderkey]", + " SlidingHashJoinExec: join_type=Inner, on=[(l_shipdate@1, o_orderdate@1)], filter=l_orderkey@1 < o_orderkey@0 - 10 AND l_orderkey@1 > o_orderkey@0 + 10", + " CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/lineitem.csv]]}, projection=[l_orderkey, l_shipdate], infinite_source=true, output_ordering=[l_orderkey@0 ASC NULLS LAST], has_header=true", + " CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/orders.csv]]}, projection=[o_orderkey, o_orderdate], infinite_source=true, output_ordering=[o_orderkey@0 ASC NULLS LAST], has_header=true", " CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/customer.csv]]}, projection=[c_custkey, c_nationkey], infinite_source=true, output_ordering=[c_custkey@0 ASC NULLS LAST], has_header=true", " CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/nation.csv]]}, projection=[n_nationkey, n_regionkey], infinite_source=true, output_ordering=[n_nationkey@0 ASC NULLS LAST], has_header=true", ]; @@ -4283,14 +4279,13 @@ mod sql_fuzzy_tests { let expected_plan = [ "ProjectionExec: expr=[o_orderkey@0 as o_orderkey, LAST_VALUE(lineitem.l_suppkey) ORDER BY [lineitem.l_orderkey ASC NULLS LAST]@1 as amount_usd]", " AggregateExec: mode=Single, gby=[o_orderkey@0 as o_orderkey], aggr=[LAST_VALUE(lineitem.l_suppkey)], ordering_mode=FullyOrdered", - " ProjectionExec: expr=[o_orderkey@0 as o_orderkey, l_orderkey@2 as l_orderkey, l_suppkey@3 as l_suppkey]", - " ProjectionExec: expr=[o_orderkey@3 as o_orderkey, o_orderdate@4 as o_orderdate, l_orderkey@0 as l_orderkey, l_suppkey@1 as l_suppkey, l_shipdate@2 as l_shipdate]", - " PartitionedHashJoinExec: join_type=Inner, on=[(l_shipdate@2, o_orderdate@1)], filter=l_orderkey@1 < o_orderkey@0 - 10", - " ProjectionExec: expr=[l_orderkey@0 as l_orderkey, l_suppkey@1 as l_suppkey, l_shipdate@3 as l_shipdate]", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: l_returnflag@2 = R", - " CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/lineitem.csv]]}, projection=[l_orderkey, l_suppkey, l_returnflag, l_shipdate], infinite_source=true, output_ordering=[l_orderkey@0 ASC NULLS LAST], has_header=true", - " CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/orders.csv]]}, projection=[o_orderkey, o_orderdate], infinite_source=true, output_ordering=[o_orderkey@0 ASC NULLS LAST], has_header=true", + " ProjectionExec: expr=[o_orderkey@3 as o_orderkey, l_orderkey@0 as l_orderkey, l_suppkey@1 as l_suppkey]", + " PartitionedHashJoinExec: join_type=Inner, on=[(l_shipdate@2, o_orderdate@1)], filter=l_orderkey@1 < o_orderkey@0 - 10", + " ProjectionExec: expr=[l_orderkey@0 as l_orderkey, l_suppkey@1 as l_suppkey, l_shipdate@3 as l_shipdate]", + " CoalesceBatchesExec: target_batch_size=8192", + " FilterExec: l_returnflag@2 = R", + " CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/lineitem.csv]]}, projection=[l_orderkey, l_suppkey, l_returnflag, l_shipdate], infinite_source=true, output_ordering=[l_orderkey@0 ASC NULLS LAST], has_header=true", + " CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/orders.csv]]}, projection=[o_orderkey, o_orderdate], infinite_source=true, output_ordering=[o_orderkey@0 ASC NULLS LAST], has_header=true", ]; experiment(&expected_plan, sql).await?; @@ -4314,14 +4309,13 @@ mod sql_fuzzy_tests { let expected_plan = [ "ProjectionExec: expr=[o_orderkey@0 as o_orderkey, LAST_VALUE(lineitem.l_suppkey) ORDER BY [lineitem.l_orderkey ASC NULLS LAST]@1 as amount_usd]", " AggregateExec: mode=Single, gby=[o_orderkey@0 as o_orderkey], aggr=[LAST_VALUE(lineitem.l_suppkey)], ordering_mode=FullyOrdered", - " ProjectionExec: expr=[o_orderkey@0 as o_orderkey, l_orderkey@2 as l_orderkey, l_suppkey@3 as l_suppkey]", - " ProjectionExec: expr=[o_orderkey@3 as o_orderkey, o_orderdate@4 as o_orderdate, l_orderkey@0 as l_orderkey, l_suppkey@1 as l_suppkey, l_shipdate@2 as l_shipdate]", - " SlidingHashJoinExec: join_type=Inner, on=[(l_shipdate@2, o_orderdate@1)], filter=l_orderkey@1 < o_orderkey@0 - 10 AND l_orderkey@1 > o_orderkey@0 + 10", - " ProjectionExec: expr=[l_orderkey@0 as l_orderkey, l_suppkey@1 as l_suppkey, l_shipdate@3 as l_shipdate]", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: l_returnflag@2 = R", - " CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/lineitem.csv]]}, projection=[l_orderkey, l_suppkey, l_returnflag, l_shipdate], infinite_source=true, output_ordering=[l_orderkey@0 ASC NULLS LAST], has_header=true", - " CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/orders.csv]]}, projection=[o_orderkey, o_orderdate], infinite_source=true, output_ordering=[o_orderkey@0 ASC NULLS LAST], has_header=true", + " ProjectionExec: expr=[o_orderkey@3 as o_orderkey, l_orderkey@0 as l_orderkey, l_suppkey@1 as l_suppkey]", + " SlidingHashJoinExec: join_type=Inner, on=[(l_shipdate@2, o_orderdate@1)], filter=l_orderkey@1 < o_orderkey@0 - 10 AND l_orderkey@1 > o_orderkey@0 + 10", + " ProjectionExec: expr=[l_orderkey@0 as l_orderkey, l_suppkey@1 as l_suppkey, l_shipdate@3 as l_shipdate]", + " CoalesceBatchesExec: target_batch_size=8192", + " FilterExec: l_returnflag@2 = R", + " CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/lineitem.csv]]}, projection=[l_orderkey, l_suppkey, l_returnflag, l_shipdate], infinite_source=true, output_ordering=[l_orderkey@0 ASC NULLS LAST], has_header=true", + " CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/orders.csv]]}, projection=[o_orderkey, o_orderdate], infinite_source=true, output_ordering=[o_orderkey@0 ASC NULLS LAST], has_header=true", ]; experiment(&expected_plan, sql).await?; @@ -4345,14 +4339,13 @@ mod sql_fuzzy_tests { let expected_plan = [ "ProjectionExec: expr=[o_orderkey@0 as o_orderkey, AVG(lineitem.l_suppkey)@1 as amount_usd]", " AggregateExec: mode=Single, gby=[o_orderkey@0 as o_orderkey], aggr=[AVG(lineitem.l_suppkey)], ordering_mode=FullyOrdered", - " ProjectionExec: expr=[o_orderkey@0 as o_orderkey, l_suppkey@3 as l_suppkey]", - " ProjectionExec: expr=[o_orderkey@3 as o_orderkey, o_orderdate@4 as o_orderdate, l_orderkey@0 as l_orderkey, l_suppkey@1 as l_suppkey, l_shipdate@2 as l_shipdate]", - " SlidingHashJoinExec: join_type=Right, on=[(l_shipdate@2, o_orderdate@1)], filter=l_orderkey@1 < o_orderkey@0 - 10 AND l_orderkey@1 > o_orderkey@0 + 10", - " ProjectionExec: expr=[l_orderkey@0 as l_orderkey, l_suppkey@1 as l_suppkey, l_shipdate@3 as l_shipdate]", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: l_returnflag@2 = R", - " CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/lineitem.csv]]}, projection=[l_orderkey, l_suppkey, l_returnflag, l_shipdate], infinite_source=true, output_ordering=[l_orderkey@0 ASC NULLS LAST], has_header=true", - " CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/orders.csv]]}, projection=[o_orderkey, o_orderdate], infinite_source=true, output_ordering=[o_orderkey@0 ASC NULLS LAST], has_header=true", + " ProjectionExec: expr=[o_orderkey@3 as o_orderkey, l_suppkey@1 as l_suppkey]", + " SlidingHashJoinExec: join_type=Right, on=[(l_shipdate@2, o_orderdate@1)], filter=l_orderkey@1 < o_orderkey@0 - 10 AND l_orderkey@1 > o_orderkey@0 + 10", + " ProjectionExec: expr=[l_orderkey@0 as l_orderkey, l_suppkey@1 as l_suppkey, l_shipdate@3 as l_shipdate]", + " CoalesceBatchesExec: target_batch_size=8192", + " FilterExec: l_returnflag@2 = R", + " CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/lineitem.csv]]}, projection=[l_orderkey, l_suppkey, l_returnflag, l_shipdate], infinite_source=true, output_ordering=[l_orderkey@0 ASC NULLS LAST], has_header=true", + " CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/orders.csv]]}, projection=[o_orderkey, o_orderdate], infinite_source=true, output_ordering=[o_orderkey@0 ASC NULLS LAST], has_header=true", ]; experiment(&expected_plan, sql).await?; @@ -4372,12 +4365,11 @@ mod sql_fuzzy_tests { let expected_plan = [ "AggregateExec: mode=Single, gby=[o_orderkey@0 as o_orderkey], aggr=[], ordering_mode=FullyOrdered", - " ProjectionExec: expr=[o_orderkey@0 as o_orderkey]", - " ProjectionExec: expr=[o_orderkey@1 as o_orderkey, o_comment@2 as o_comment, r_comment@0 as r_comment]", - " CoalesceBatchesExec: target_batch_size=8192", - " HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(r_comment@0, o_comment@1)]", - " CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/region.csv]]}, projection=[r_comment], has_header=true", - " CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/orders.csv]]}, projection=[o_orderkey, o_comment], infinite_source=true, output_ordering=[o_orderkey@0 ASC NULLS LAST], has_header=true", + " ProjectionExec: expr=[o_orderkey@1 as o_orderkey]", + " CoalesceBatchesExec: target_batch_size=8192", + " HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(r_comment@0, o_comment@1)]", + " CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/region.csv]]}, projection=[r_comment], has_header=true", + " CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/orders.csv]]}, projection=[o_orderkey, o_comment], infinite_source=true, output_ordering=[o_orderkey@0 ASC NULLS LAST], has_header=true", ]; experiment(&expected_plan, sql).await?; diff --git a/datafusion/core/src/physical_optimizer/mod.rs b/datafusion/core/src/physical_optimizer/mod.rs index 3c7668206d2d..84058bc880a1 100644 --- a/datafusion/core/src/physical_optimizer/mod.rs +++ b/datafusion/core/src/physical_optimizer/mod.rs @@ -34,12 +34,13 @@ pub mod pipeline_checker; mod projection_pushdown; pub mod pruning; pub mod replace_with_order_preserving_variants; -mod sort_pushdown; +#[cfg(test)] +pub mod test_utils; pub mod topk_aggregation; -mod utils; mod join_pipeline_selection; -#[cfg(test)] -pub mod test_utils; +mod projection_pushdown; +mod sort_pushdown; +mod utils; pub use optimizer::PhysicalOptimizerRule; diff --git a/datafusion/core/src/physical_optimizer/pipeline_checker.rs b/datafusion/core/src/physical_optimizer/pipeline_checker.rs index ca4edf9b1d7c..ad3e11eb2e76 100644 --- a/datafusion/core/src/physical_optimizer/pipeline_checker.rs +++ b/datafusion/core/src/physical_optimizer/pipeline_checker.rs @@ -24,7 +24,6 @@ use std::sync::Arc; use crate::config::ConfigOptions; use crate::error::Result; use crate::physical_optimizer::PhysicalOptimizerRule; -use crate::physical_plan::joins::SymmetricHashJoinExec; use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan}; use datafusion_common::config::OptimizerOptions; @@ -32,6 +31,7 @@ use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion}; use datafusion_common::{plan_err, DataFusionError}; use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supported}; use datafusion_physical_plan::joins::prunability::is_filter_expr_prunable; +use datafusion_physical_plan::joins::SymmetricHashJoinExec; /// The PipelineChecker rule rejects non-runnable query plans that use /// pipeline-breaking operators on infinite input(s). diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs index 664afbe822ff..3123e0ca33e4 100644 --- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs @@ -15,6 +15,9 @@ // specific language governing permissions and limitations // under the License. +// Copyright (C) Synnada, Inc. - All Rights Reserved. +// This file does not contain any Apache Software Foundation copyrighted code. + //! This file implements the `ProjectionPushdown` physical optimization rule. //! The function [`remove_unnecessary_projections`] tries to push down all //! projections one by one if the operator below is amenable to this. If a @@ -29,10 +32,10 @@ use crate::datasource::physical_plan::CsvExec; use crate::error::Result; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::physical_plan::filter::FilterExec; -use crate::physical_plan::joins::utils::{ColumnIndex, JoinFilter}; +use crate::physical_plan::joins::utils::{ColumnIndex, JoinFilter, JoinSide}; use crate::physical_plan::joins::{ - CrossJoinExec, HashJoinExec, NestedLoopJoinExec, SortMergeJoinExec, - SymmetricHashJoinExec, + CrossJoinExec, HashJoinExec, NestedLoopJoinExec, SlidingHashJoinExec, + SortMergeJoinExec, SymmetricHashJoinExec, }; use crate::physical_plan::memory::MemoryExec; use crate::physical_plan::projection::ProjectionExec; @@ -133,6 +136,8 @@ pub fn remove_unnecessary_projections( try_swapping_with_cross_join(projection, cross_join)? } else if let Some(nl_join) = input.downcast_ref::() { try_swapping_with_nested_loop_join(projection, nl_join)? + } else if let Some(sh_join) = input.downcast_ref::() { + try_swapping_with_sliding_hash_join(projection, sh_join)? } else if let Some(sm_join) = input.downcast_ref::() { try_swapping_with_sort_merge_join(projection, sm_join)? } else if let Some(sym_join) = input.downcast_ref::() { @@ -679,6 +684,85 @@ fn try_swapping_with_nested_loop_join( )?))) } +/// Tries to swap the projection with its input [`SlidingHashJoinExec`]. If it can be done, +/// it returns the new swapped version having the [`SlidingHashJoinExec`] as the top plan. +/// Otherwise, it returns None. +fn try_swapping_with_sliding_hash_join( + projection: &ProjectionExec, + sh_join: &SlidingHashJoinExec, +) -> Result>> { + // Convert projected PhysicalExpr's to columns. If not possible, we cannot proceed. + let Some(projection_as_columns) = physical_to_column_exprs(projection.expr()) else { + return Ok(None); + }; + + let (far_right_left_col_ind, far_left_right_col_ind) = join_table_borders( + sh_join.left().schema().fields().len(), + &projection_as_columns, + ); + + if !join_allows_pushdown( + &projection_as_columns, + sh_join.schema(), + far_right_left_col_ind, + far_left_right_col_ind, + ) { + return Ok(None); + } + + let Some(new_on) = update_join_on( + &projection_as_columns[0..=far_right_left_col_ind as _], + &projection_as_columns[far_left_right_col_ind as _..], + sh_join.on(), + ) else { + return Ok(None); + }; + + let Some(new_filter) = update_join_filter( + &projection_as_columns[0..=far_right_left_col_ind as _], + &projection_as_columns[far_left_right_col_ind as _..], + sh_join.filter(), + sh_join.left(), + sh_join.right(), + ) else { + return Ok(None); + }; + + let (new_left_sort, new_right_sort) = match ( + update_sort_expr( + sh_join.left_sort_exprs(), + &projection.expr()[0..=far_right_left_col_ind as _], + )?, + update_sort_expr( + sh_join.right_sort_exprs(), + &projection.expr()[far_left_right_col_ind as _..], + )?, + ) { + (Some(left), Some(right)) => (left, right), + _ => return Ok(None), + }; + + let (new_left, new_right) = new_join_children( + projection_as_columns, + far_right_left_col_ind, + far_left_right_col_ind, + sh_join.left(), + sh_join.right(), + )?; + + Ok(Some(Arc::new(SlidingHashJoinExec::try_new( + Arc::new(new_left), + Arc::new(new_right), + new_on, + new_filter, + sh_join.join_type(), + sh_join.null_equals_null(), + new_left_sort, + new_right_sort, + *sh_join.partition_mode(), + )?))) +} + /// Tries to swap the projection with its input [`SortMergeJoinExec`]. If it can be done, /// it returns the new swapped version having the [`SortMergeJoinExec`] as the top plan. /// Otherwise, it returns None. @@ -876,7 +960,6 @@ fn update_expr( /// references could not be. RewrittenInvalid, } - let mut state = RewriteState::Unchanged; let new_expr = expr @@ -1156,6 +1239,25 @@ fn new_join_children( Ok((new_left, new_right)) } +/// [`PhysicalSortExpr`] handler version of update_expr() function. +fn update_sort_expr( + sort_exprs: &[PhysicalSortExpr], + projected_exprs: &[(Arc, String)], +) -> Result>> { + let mut new_sort_exprs = vec![]; + for sort_expr in sort_exprs { + let Some(updated_expr) = update_expr(&sort_expr.expr, projected_exprs, false)? + else { + return Ok(None); + }; + new_sort_exprs.push(PhysicalSortExpr { + expr: updated_expr, + options: sort_expr.options, + }) + } + Ok(Some(new_sort_exprs)) +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -1170,7 +1272,7 @@ mod tests { use crate::physical_optimizer::PhysicalOptimizerRule; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::physical_plan::filter::FilterExec; - use crate::physical_plan::joins::utils::{ColumnIndex, JoinFilter}; + use crate::physical_plan::joins::utils::{ColumnIndex, JoinFilter, JoinSide}; use crate::physical_plan::joins::StreamJoinPartitionMode; use crate::physical_plan::memory::MemoryExec; use crate::physical_plan::projection::ProjectionExec; @@ -1192,7 +1294,7 @@ mod tests { Distribution, Partitioning, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement, ScalarFunctionExpr, }; - use datafusion_physical_plan::joins::SymmetricHashJoinExec; + use datafusion_physical_plan::joins::{SymmetricHashJoinExec, SlidingHashJoinExec}; use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec}; use datafusion_physical_plan::union::UnionExec; diff --git a/datafusion/expr/src/signature.rs b/datafusion/expr/src/signature.rs index 685601523f9b..04ee059f9d2a 100644 --- a/datafusion/expr/src/signature.rs +++ b/datafusion/expr/src/signature.rs @@ -249,7 +249,7 @@ impl Signature { /// the function's behavior is monotonic, or non-monotonic/unknown for that argument, namely: /// - `None` signifies unknown monotonicity or non-monotonicity. /// - `Some(true)` indicates that the function is monotonically increasing w.r.t. the argument in question. -/// - Some(false) indicates that the function is monotonically decreasing w.r.t. the argument in question. +/// - `Some(false)` indicates that the function is monotonically decreasing w.r.t. the argument in question. pub type FuncMonotonicity = Vec>; #[cfg(test)] diff --git a/datafusion/physical-expr/src/expressions/negative.rs b/datafusion/physical-expr/src/expressions/negative.rs index b64b4a0c86de..af7e6c74d9b8 100644 --- a/datafusion/physical-expr/src/expressions/negative.rs +++ b/datafusion/physical-expr/src/expressions/negative.rs @@ -21,6 +21,7 @@ use std::any::Any; use std::hash::{Hash, Hasher}; use std::sync::Arc; +use crate::intervals::Interval; use crate::physical_expr::down_cast_any_ref; use crate::sort_properties::SortProperties; use crate::PhysicalExpr; diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index 53de85843919..89b399b9ff2e 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -30,26 +30,28 @@ //! an argument i32 is passed to a function that supports f64, the //! argument is automatically is coerced to f64. +use std::ops::Neg; +use std::sync::Arc; + use crate::execution_props::ExecutionProps; +use crate::expressions::{cast_column, nullif_func}; use crate::sort_properties::SortProperties; use crate::{ array_expressions, conditional_expressions, datetime_expressions, expressions::nullif_func, math_expressions, string_expressions, struct_expressions, PhysicalExpr, ScalarFunctionExpr, }; + use arrow::{ array::ArrayRef, compute::kernels::length::{bit_length, length}, - datatypes::{DataType, Int32Type, Int64Type, Schema}, + datatypes::{DataType, Int32Type, Int64Type, Schema, TimeUnit}, }; use datafusion_common::{internal_err, DataFusionError, Result, ScalarValue}; -pub use datafusion_expr::FuncMonotonicity; use datafusion_expr::{ type_coercion::functions::data_types, BuiltinScalarFunction, ColumnarValue, ScalarFunctionImplementation, }; -use std::ops::Neg; -use std::sync::Arc; /// Create a physical (function) expression. /// This function errors when `args`' can't be coerced to a valid argument type of the function. @@ -979,8 +981,8 @@ fn func_order_in_one_dimension( #[cfg(test)] mod tests { use super::*; - use crate::expressions::try_cast; - use crate::expressions::{col, lit}; + use crate::expressions::{col, lit, try_cast}; + use arrow::{ array::{ Array, ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array, @@ -990,8 +992,7 @@ mod tests { record_batch::RecordBatch, }; use datafusion_common::cast::as_uint64_array; - use datafusion_common::{exec_err, plan_err}; - use datafusion_common::{Result, ScalarValue}; + use datafusion_common::{exec_err, plan_err, Result, ScalarValue}; use datafusion_expr::type_coercion::functions::data_types; use datafusion_expr::Signature; diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index a51c3aed13ec..277a69dc6782 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -255,7 +255,6 @@ physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true physical_plan_with_stats CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]] - ### tests for EXPLAIN with display statistics enabled statement ok set datafusion.explain.show_statistics = true; diff --git a/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt b/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt index 2a6fc6914a97..c7b00bf65622 100644 --- a/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt +++ b/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt @@ -283,13 +283,12 @@ Sort: subquery.c_custkey ASC NULLS LAST, subquery.price_rank ASC NULLS LAST ----------WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [customer.c_custkey ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] ------------TableScan: customer projection=[c_custkey, c_nationkey] physical_plan -ProjectionExec: expr=[c_custkey@2 as c_custkey, c_nationkey@3 as c_nationkey, price_rank@4 as price_rank, n_name@1 as n_name] ---ProjectionExec: expr=[n_nationkey@3 as n_nationkey, n_name@4 as n_name, c_custkey@0 as c_custkey, c_nationkey@1 as c_nationkey, price_rank@2 as price_rank] -----SortMergeJoin: join_type=Inner, on=[(c_custkey@0, n_nationkey@0)] -------ProjectionExec: expr=[c_custkey@0 as c_custkey, c_nationkey@1 as c_nationkey, ROW_NUMBER() ORDER BY [customer.c_custkey ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as price_rank] ---------BoundedWindowAggExec: wdw=[ROW_NUMBER() ORDER BY [customer.c_custkey ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "ROW_NUMBER() ORDER BY [customer.c_custkey ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }], mode=[Sorted] -----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/customer.csv]]}, projection=[c_custkey, c_nationkey], infinite_source=true, output_ordering=[c_custkey@0 ASC NULLS LAST], has_header=false -------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/nation.csv]]}, projection=[n_nationkey, n_name], infinite_source=true, output_ordering=[n_nationkey@0 ASC NULLS LAST], has_header=false +ProjectionExec: expr=[c_custkey@0 as c_custkey, c_nationkey@1 as c_nationkey, price_rank@2 as price_rank, n_name@4 as n_name] +--SortMergeJoin: join_type=Inner, on=[(c_custkey@0, n_nationkey@0)] +----ProjectionExec: expr=[c_custkey@0 as c_custkey, c_nationkey@1 as c_nationkey, ROW_NUMBER() ORDER BY [customer.c_custkey ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as price_rank] +------BoundedWindowAggExec: wdw=[ROW_NUMBER() ORDER BY [customer.c_custkey ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "ROW_NUMBER() ORDER BY [customer.c_custkey ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }], mode=[Sorted] +--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/customer.csv]]}, projection=[c_custkey, c_nationkey], infinite_source=true, output_ordering=[c_custkey@0 ASC NULLS LAST], has_header=false +----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/nation.csv]]}, projection=[n_nationkey, n_name], infinite_source=true, output_ordering=[n_nationkey@0 ASC NULLS LAST], has_header=false query TT EXPLAIN SELECT @@ -374,18 +373,16 @@ Sort: nation.n_nationkey ASC NULLS LAST ----TableScan: nation projection=[n_nationkey, n_name, n_regionkey, n_comment, n_rev] physical_plan SlidingNestedLoopJoinExec: join_type=Inner, filter=n_nationkey@1 > c_custkey@0 AND n_nationkey@1 < c_custkey@0 + 20 ---ProjectionExec: expr=[c_custkey@0 as c_custkey, c_nationkey@1 as c_nationkey] -----ProjectionExec: expr=[c_custkey@0 as c_custkey, c_nationkey@1 as c_nationkey, o_orderkey@2 as o_orderkey] -------ProjectionExec: expr=[c_custkey@2 as c_custkey, c_nationkey@3 as c_nationkey, o_orderkey@4 as o_orderkey, o_orderdate@5 as o_orderdate, l_orderkey@0 as l_orderkey, l_shipdate@1 as l_shipdate] ---------SlidingHashJoinExec: join_type=Inner, on=[(l_shipdate@1, o_orderdate@3)], filter=l_orderkey@1 >= o_orderkey@0 - 10 AND l_orderkey@1 < o_orderkey@0 + 10 -----------ProjectionExec: expr=[l_orderkey@0 as l_orderkey, l_shipdate@2 as l_shipdate] -------------CoalesceBatchesExec: target_batch_size=8192 ---------------FilterExec: l_returnflag@1 = R -----------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/lineitem.csv]]}, projection=[l_orderkey, l_returnflag, l_shipdate], infinite_source=true, output_ordering=[l_orderkey@0 ASC NULLS LAST], has_header=false -----------ProjectionExec: expr=[c_custkey@2 as c_custkey, c_nationkey@3 as c_nationkey, o_orderkey@0 as o_orderkey, o_orderdate@1 as o_orderdate] -------------SortMergeJoin: join_type=Inner, on=[(o_orderkey@0, c_custkey@0)] ---------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/orders.csv]]}, projection=[o_orderkey, o_orderdate], infinite_source=true, output_ordering=[o_orderkey@0 ASC NULLS LAST], has_header=false ---------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/customer.csv]]}, projection=[c_custkey, c_nationkey], infinite_source=true, output_ordering=[c_custkey@0 ASC NULLS LAST], has_header=false +--ProjectionExec: expr=[c_custkey@2 as c_custkey, c_nationkey@3 as c_nationkey] +----SlidingHashJoinExec: join_type=Inner, on=[(l_shipdate@1, o_orderdate@3)], filter=l_orderkey@1 >= o_orderkey@0 - 10 AND l_orderkey@1 < o_orderkey@0 + 10 +------ProjectionExec: expr=[l_orderkey@0 as l_orderkey, l_shipdate@2 as l_shipdate] +--------CoalesceBatchesExec: target_batch_size=8192 +----------FilterExec: l_returnflag@1 = R +------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/lineitem.csv]]}, projection=[l_orderkey, l_returnflag, l_shipdate], infinite_source=true, output_ordering=[l_orderkey@0 ASC NULLS LAST], has_header=false +------ProjectionExec: expr=[c_custkey@2 as c_custkey, c_nationkey@3 as c_nationkey, o_orderkey@0 as o_orderkey, o_orderdate@1 as o_orderdate] +--------SortMergeJoin: join_type=Inner, on=[(o_orderkey@0, c_custkey@0)] +----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/orders.csv]]}, projection=[o_orderkey, o_orderdate], infinite_source=true, output_ordering=[o_orderkey@0 ASC NULLS LAST], has_header=false +----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/customer.csv]]}, projection=[c_custkey, c_nationkey], infinite_source=true, output_ordering=[c_custkey@0 ASC NULLS LAST], has_header=false --CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/nation.csv]]}, projection=[n_nationkey, n_name, n_regionkey, n_comment, n_rev], infinite_source=true, output_ordering=[n_nationkey@0 ASC NULLS LAST], has_header=false query TT diff --git a/datafusion/sqllogictest/test_files/stream.slt b/datafusion/sqllogictest/test_files/stream.slt index e6ed6e83248d..cfcb652acb42 100644 --- a/datafusion/sqllogictest/test_files/stream.slt +++ b/datafusion/sqllogictest/test_files/stream.slt @@ -200,10 +200,8 @@ ProjectionExec: expr=[ROW_NUMBER() ORDER BY [u.ts ASC NULLS LAST] RANGE BETWEEN --BoundedWindowAggExec: wdw=[ROW_NUMBER() ORDER BY [u.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "ROW_NUMBER() ORDER BY [u.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(IntervalMonthDayNano("NULL")), end_bound: CurrentRow }], mode=[Sorted] ----SortPreservingMergeExec: [ts@0 ASC NULLS LAST] ------UnionExec ---------ProjectionExec: expr=[ts@0 as ts, currency@2 as currency, amount@1 as amount] -----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/sales_us.csv]]}, projection=[ts, amount, currency], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST], has_header=true ---------ProjectionExec: expr=[ts@0 as ts, currency@2 as currency, amount@1 as amount] -----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/sales_global.csv]]}, projection=[ts, amount, currency], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST], has_header=true +--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/sales_us.csv]]}, projection=[ts, currency, amount], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST], has_header=true +--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/sales_global.csv]]}, projection=[ts, currency, amount], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST], has_header=true query PIT SELECT * EXCLUDE(sn) FROM sales_us LIMIT 5 @@ -297,21 +295,19 @@ ProjectionExec: expr=[amount_usd@0 as amount_usd] --------CoalesceBatchesExec: target_batch_size=8192 ----------SortPreservingRepartitionExec: partitioning=Hash([sn@0], 4), input_partitions=4, sort_exprs=sn@0 ASC NULLS LAST ------------AggregateExec: mode=Partial, gby=[sn@0 as sn], aggr=[LAST_VALUE(e.rate)], ordering_mode=FullyOrdered ---------------ProjectionExec: expr=[sn@1 as sn, sn2@4 as sn2, rate@6 as rate] -----------------ProjectionExec: expr=[ts@4 as ts, sn@5 as sn, currency@6 as currency, ts@0 as ts, sn2@1 as sn2, currency_from@2 as currency_from, rate@3 as rate] -------------------PartitionedHashJoinExec: join_type=Inner, on=[(currency_from@2, currency@2)], filter=ts@0 >= ts@1 ---------------------CoalesceBatchesExec: target_batch_size=8192 -----------------------SortPreservingRepartitionExec: partitioning=Hash([currency_from@2], 4), input_partitions=4, sort_exprs=sn2@1 ASC NULLS LAST -------------------------ProjectionExec: expr=[ts@0 as ts, sn2@1 as sn2, currency_from@2 as currency_from, rate@4 as rate] ---------------------------CoalesceBatchesExec: target_batch_size=8192 -----------------------------FilterExec: currency_to@3 = USD -------------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 ---------------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/exchange_rates.csv]]}, projection=[ts, sn2, currency_from, currency_to, rate], infinite_source=true, output_ordering=[sn2@1 ASC NULLS LAST], has_header=true ---------------------CoalesceBatchesExec: target_batch_size=8192 -----------------------SortPreservingRepartitionExec: partitioning=Hash([currency@2], 4), input_partitions=4, sort_exprs=sn@1 ASC NULLS LAST -------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 ---------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/sales_global.csv]]}, projection=[ts, sn, currency], infinite_source=true, output_ordering=[sn@1 ASC NULLS LAST], has_header=true - +--------------ProjectionExec: expr=[sn@5 as sn, sn2@1 as sn2, rate@3 as rate] +----------------PartitionedHashJoinExec: join_type=Inner, on=[(currency_from@2, currency@2)], filter=ts@0 >= ts@1 +------------------CoalesceBatchesExec: target_batch_size=8192 +--------------------SortPreservingRepartitionExec: partitioning=Hash([currency_from@2], 4), input_partitions=4, sort_exprs=sn2@1 ASC NULLS LAST +----------------------ProjectionExec: expr=[ts@0 as ts, sn2@1 as sn2, currency_from@2 as currency_from, rate@4 as rate] +------------------------CoalesceBatchesExec: target_batch_size=8192 +--------------------------FilterExec: currency_to@3 = USD +----------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +------------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/exchange_rates.csv]]}, projection=[ts, sn2, currency_from, currency_to, rate], infinite_source=true, output_ordering=[sn2@1 ASC NULLS LAST], has_header=true +------------------CoalesceBatchesExec: target_batch_size=8192 +--------------------SortPreservingRepartitionExec: partitioning=Hash([currency@2], 4), input_partitions=4, sort_exprs=sn@1 ASC NULLS LAST +----------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/sales_global.csv]]}, projection=[ts, sn, currency], infinite_source=true, output_ordering=[sn@1 ASC NULLS LAST], has_header=true # Scalar Subquery not supported # SELECT s.*, @@ -390,21 +386,19 @@ ProjectionExec: expr=[amount_usd@0 as amount_usd] --------CoalesceBatchesExec: target_batch_size=8192 ----------SortPreservingRepartitionExec: partitioning=Hash([sn@0], 4), input_partitions=4, sort_exprs=sn@0 ASC NULLS LAST ------------AggregateExec: mode=Partial, gby=[sn@0 as sn], aggr=[AVG(e.rate)], ordering_mode=FullyOrdered ---------------ProjectionExec: expr=[sn@1 as sn, rate@5 as rate] -----------------ProjectionExec: expr=[ts@3 as ts, sn@4 as sn, currency@5 as currency, ts@0 as ts, currency_from@1 as currency_from, rate@2 as rate] -------------------SlidingHashJoinExec: join_type=Right, on=[(currency_from@1, currency@2)], filter=ts@0 >= ts@1 AND ts@1 >= ts@0 - 600000000000 ---------------------CoalesceBatchesExec: target_batch_size=8192 -----------------------SortPreservingRepartitionExec: partitioning=Hash([currency_from@1], 4), input_partitions=4, sort_exprs=ts@0 ASC NULLS LAST -------------------------ProjectionExec: expr=[ts@0 as ts, currency_from@1 as currency_from, rate@3 as rate] ---------------------------CoalesceBatchesExec: target_batch_size=8192 -----------------------------FilterExec: currency_to@2 = USD -------------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 ---------------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/exchange_rates.csv]]}, projection=[ts, currency_from, currency_to, rate], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST], has_header=true ---------------------CoalesceBatchesExec: target_batch_size=8192 -----------------------SortPreservingRepartitionExec: partitioning=Hash([currency@2], 4), input_partitions=4, sort_exprs=ts@0 ASC NULLS LAST -------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 ---------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/sales_global.csv]]}, projection=[ts, sn, currency], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST], has_header=true - +--------------ProjectionExec: expr=[sn@4 as sn, rate@2 as rate] +----------------SlidingHashJoinExec: join_type=Right, on=[(currency_from@1, currency@2)], filter=ts@0 >= ts@1 AND ts@1 >= ts@0 - 600000000000 +------------------CoalesceBatchesExec: target_batch_size=8192 +--------------------SortPreservingRepartitionExec: partitioning=Hash([currency_from@1], 4), input_partitions=4, sort_exprs=ts@0 ASC NULLS LAST +----------------------ProjectionExec: expr=[ts@0 as ts, currency_from@1 as currency_from, rate@3 as rate] +------------------------CoalesceBatchesExec: target_batch_size=8192 +--------------------------FilterExec: currency_to@2 = USD +----------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +------------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/exchange_rates.csv]]}, projection=[ts, currency_from, currency_to, rate], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST], has_header=true +------------------CoalesceBatchesExec: target_batch_size=8192 +--------------------SortPreservingRepartitionExec: partitioning=Hash([currency@2], 4), input_partitions=4, sort_exprs=ts@0 ASC NULLS LAST +----------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/sales_global.csv]]}, projection=[ts, sn, currency], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST], has_header=true # Explicit Windows in Joins @@ -436,7 +430,6 @@ SortPreservingMergeExec: [ts@0 ASC NULLS LAST] ------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/exchange_rates.csv]]}, projection=[ts, sn, currency_from, currency_to, rate], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST], has_header=true - # Machine Learning via User-Defined Functions (UDFs) # SELECT e.*, diff --git a/dev/release/rat_exclude_files.txt b/dev/release/rat_exclude_files.txt index 19682d0f8267..070b5e72e367 100644 --- a/dev/release/rat_exclude_files.txt +++ b/dev/release/rat_exclude_files.txt @@ -153,4 +153,5 @@ datafusion/sqllogictest/test_files/stream.slt datafusion/core/tests/data/exchange_rates.csv datafusion/core/tests/data/sales_global.csv datafusion/core/tests/data/sales_us.csv +datafusion/core/src/physical_optimizer/projection_pushdown.rs datafusion/physical-plan/src/joins/prunability.rs