diff --git a/datafusion/core/src/physical_optimizer/join_pipeline_selection.rs b/datafusion/core/src/physical_optimizer/join_pipeline_selection.rs index acbd0bf1da5b..698eff8e073a 100644 --- a/datafusion/core/src/physical_optimizer/join_pipeline_selection.rs +++ b/datafusion/core/src/physical_optimizer/join_pipeline_selection.rs @@ -344,15 +344,15 @@ fn check_hash_join_convertable( let left_satisfied = get_indices_of_matching_sort_exprs_with_order_eq( left_order, &on_left, - || hash_join.left().equivalence_properties(), - || hash_join.left().ordering_equivalence_properties(), + &hash_join.left().equivalence_properties(), + &hash_join.left().ordering_equivalence_properties(), ); // Get right key(s)' sort options: let right_satisfied = get_indices_of_matching_sort_exprs_with_order_eq( right_order, &on_right, - || hash_join.right().equivalence_properties(), - || hash_join.right().ordering_equivalence_properties(), + &hash_join.right().equivalence_properties(), + &hash_join.right().ordering_equivalence_properties(), ); if let ( diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs b/datafusion/core/src/physical_optimizer/test_utils.rs index b11a74a47636..5879ab4d36ad 100644 --- a/datafusion/core/src/physical_optimizer/test_utils.rs +++ b/datafusion/core/src/physical_optimizer/test_utils.rs @@ -391,7 +391,7 @@ pub fn memory_exec_with_sort( ) -> Arc { let mem = MemoryExec::try_new(&[], schema.clone(), None).unwrap(); Arc::new(if let Some(sort) = sort { - mem.with_sort_information(sort) + mem.with_sort_information(vec![sort]) } else { mem }) @@ -438,21 +438,23 @@ macro_rules! assert_optimized_orthogonal { let actual = get_plan_string(&optimized_physical_plan); assert_eq!( expected_optimized_lines, actual, - "\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected_optimized_lines:#?}\nactual:\n\n{actual:#?}\n\n" - ); - // Run EnforceSorting - JoinSelection - let optimized_physical_plan_2 = - EnforceSorting::new().optimize(physical_plan.clone(), state.config_options())?; - let optimized_physical_plan_2 = JoinSelection::new().optimize(optimized_physical_plan_2.clone(), state.config_options())?; - - assert_eq!(physical_plan.schema(), optimized_physical_plan_2.schema()); - - // Get string representation of the plan - let actual = get_plan_string(&optimized_physical_plan_2); - assert_eq!( - expected_optimized_lines, actual, - "\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected_optimized_lines:#?}\nactual:\n\n{actual:#?}\n\n" + "\n**JoinSelection - EnforceSorting Optimized Plan Mismatch\n\nexpected:\n\n{expected_optimized_lines:#?}\nactual:\n\n{actual:#?}\n\n" ); + // TODO: Apply EnforceSorting first after the https://github.com/synnada-ai/arrow-datafusion/pull/165 + // is merged. + // // Run EnforceSorting - JoinSelection + // let optimized_physical_plan_2 = + // EnforceSorting::new().optimize(physical_plan.clone(), state.config_options())?; + // let optimized_physical_plan_2 = JoinSelection::new().optimize(optimized_physical_plan_2.clone(), state.config_options())?; + // + // assert_eq!(physical_plan.schema(), optimized_physical_plan_2.schema()); + // + // // Get string representation of the plan + // let actual = get_plan_string(&optimized_physical_plan_2); + // assert_eq!( + // expected_optimized_lines, actual, + // "\n**EnforceSorting - JoinSelection Optimized Plan Mismatch\n\nexpected:\n\n{expected_optimized_lines:#?}\nactual:\n\n{actual:#?}\n\n" + // ); }; } diff --git a/datafusion/physical-plan/src/joins/sliding_hash_join.rs b/datafusion/physical-plan/src/joins/sliding_hash_join.rs index 4c478915f7ee..d85c9528d83b 100644 --- a/datafusion/physical-plan/src/joins/sliding_hash_join.rs +++ b/datafusion/physical-plan/src/joins/sliding_hash_join.rs @@ -38,7 +38,7 @@ use crate::physical_plan::joins::{ check_if_sliding_window_condition_is_met, get_probe_batch, is_batch_suitable_interval_calculation, JoinStreamState, }, - symmetric_hash_join::{build_join_indices, OneSideHashJoiner}, + symmetric_hash_join::OneSideHashJoiner, utils::{ build_batch_from_indices, build_join_schema, calculate_join_output_ordering, check_join_is_valid, combine_join_equivalence_properties, @@ -63,6 +63,7 @@ use datafusion_execution::TaskContext; use datafusion_physical_expr::intervals::ExprIntervalGraph; use datafusion_physical_expr::{OrderingEquivalenceProperties, PhysicalSortRequirement}; +use crate::physical_plan::joins::hash_join::build_equal_condition_join_indices; use ahash::RandomState; use futures::{ready, Stream, StreamExt}; use parking_lot::Mutex; @@ -391,8 +392,8 @@ impl ExecutionPlan for SlidingHashJoinExec { ] } - fn benefits_from_input_partitioning(&self) -> bool { - false + fn benefits_from_input_partitioning(&self) -> Vec { + vec![false; 2] } fn equivalence_properties(&self) -> EquivalenceProperties { @@ -676,18 +677,18 @@ fn join_with_probe_batch( } // Calculates the indices to use for build and probe sides of the join: - let (build_indices, probe_indices) = build_join_indices( - probe_batch, + let (build_indices, probe_indices) = build_equal_condition_join_indices( &build_hash_joiner.hashmap, &build_hash_joiner.input_buffer, + probe_batch, &build_hash_joiner.on, &probe_hash_joiner.on, - Some(filter), random_state, null_equals_null, &mut build_hash_joiner.hashes_buffer, - Some(build_hash_joiner.deleted_offset), + Some(filter), JoinSide::Left, + Some(build_hash_joiner.deleted_offset), )?; // Record indices of the rows that were visited (if required by the join type): @@ -1242,8 +1243,8 @@ mod tests { let (left, right) = create_memory_table( left_batch, right_batch, - Some(left_sorted), - Some(right_sorted), + vec![left_sorted], + vec![right_sorted], batch_size, )?; @@ -1300,8 +1301,8 @@ mod tests { let (left, right) = create_memory_table( left_batch, right_batch, - Some(left_sorted), - Some(right_sorted), + vec![left_sorted], + vec![right_sorted], 13, )?; @@ -1365,8 +1366,8 @@ mod tests { let (left, right) = create_memory_table( left_batch, right_batch, - Some(left_sorted), - Some(right_sorted), + vec![left_sorted], + vec![right_sorted], 13, )?; @@ -1429,8 +1430,8 @@ mod tests { let (left, right) = create_memory_table( left_batch, right_batch, - Some(left_sorted), - Some(right_sorted), + vec![left_sorted], + vec![right_sorted], 13, )?; @@ -1510,8 +1511,8 @@ mod tests { let (left, right) = create_memory_table( left_batch, right_batch, - Some(left_sorted), - Some(right_sorted), + vec![left_sorted], + vec![right_sorted], 13, )?; @@ -1590,8 +1591,8 @@ mod tests { let (left, right) = create_memory_table( left_batch, right_batch, - Some(left_sorted), - Some(right_sorted), + vec![left_sorted], + vec![right_sorted], 13, )?; diff --git a/datafusion/physical-plan/src/joins/sliding_nested_loop_join.rs b/datafusion/physical-plan/src/joins/sliding_nested_loop_join.rs index 1a1cf4fc6220..bf88373047dc 100644 --- a/datafusion/physical-plan/src/joins/sliding_nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/sliding_nested_loop_join.rs @@ -1273,8 +1273,8 @@ mod tests { let (left, right) = create_memory_table( left_batch, right_batch, - Some(left_sorted), - Some(right_sorted), + vec![left_sorted], + vec![right_sorted], batch_size, )?; @@ -1333,8 +1333,8 @@ mod tests { let (left, right) = create_memory_table( left_batch, right_batch, - Some(left_sorted), - Some(right_sorted), + vec![left_sorted], + vec![right_sorted], 13, )?; @@ -1392,8 +1392,8 @@ mod tests { let (left, right) = create_memory_table( left_batch, right_batch, - Some(left_sorted), - Some(right_sorted), + vec![left_sorted], + vec![right_sorted], 13, )?; @@ -1466,8 +1466,8 @@ mod tests { let (left, right) = create_memory_table( left_batch, right_batch, - Some(left_sorted), - Some(right_sorted), + vec![left_sorted], + vec![right_sorted], 13, )?; @@ -1539,8 +1539,8 @@ mod tests { let (left, right) = create_memory_table( left_batch, right_batch, - Some(left_sorted), - Some(right_sorted), + vec![left_sorted], + vec![right_sorted], 13, )?; diff --git a/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt b/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt index 37099b3a184b..0d0e0f8f444d 100644 --- a/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt +++ b/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt @@ -42,7 +42,7 @@ CREATE UNBOUNDED EXTERNAL TABLE IF NOT EXISTS orders ( o_rev VARCHAR, ) STORED AS CSV DELIMITER '|' WITH ORDER (o_orderkey ASC) - LOCATION 'tests/tpch-csv/orders.csv'; + LOCATION '../core/tests/tpch-csv/orders.csv'; statement ok CREATE UNBOUNDED EXTERNAL TABLE IF NOT EXISTS lineitem ( @@ -65,7 +65,7 @@ CREATE UNBOUNDED EXTERNAL TABLE IF NOT EXISTS lineitem ( l_rev VARCHAR, ) STORED AS CSV DELIMITER '|' WITH ORDER (l_orderkey) - LOCATION 'tests/tpch-csv/lineitem.csv'; + LOCATION '../core/tests/tpch-csv/lineitem.csv'; statement ok CREATE UNBOUNDED EXTERNAL TABLE IF NOT EXISTS customer ( @@ -78,7 +78,7 @@ CREATE UNBOUNDED EXTERNAL TABLE IF NOT EXISTS customer ( c_mktsegment VARCHAR, c_comment VARCHAR, c_rev VARCHAR, - ) STORED AS CSV DELIMITER '|' LOCATION 'tests/tpch-csv/customer.csv' + ) STORED AS CSV DELIMITER '|' LOCATION '../core/tests/tpch-csv/customer.csv' WITH ORDER (c_custkey ASC); statement ok @@ -90,7 +90,7 @@ CREATE UNBOUNDED EXTERNAL TABLE IF NOT EXISTS nation ( n_rev VARCHAR, ) STORED AS CSV DELIMITER '|' WITH ORDER (n_nationkey ASC) - LOCATION 'tests/tpch-csv/nation.csv'; + LOCATION '../core/tests/tpch-csv/nation.csv'; # explain multiple sort merge join replacement query TT @@ -381,13 +381,11 @@ Limit: skip=0, fetch=5 ----------TableScan: annotated_data projection=[a, c] physical_plan GlobalLimitExec: skip=0, fetch=5 ---SortPreservingMergeExec: [a@0 ASC NULLS LAST], fetch=5 -----ProjectionExec: expr=[a@1 as a] -------CoalesceBatchesExec: target_batch_size=8192 ---------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(c@0, c@1)] -----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], has_header=true -----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_ordering=[a@0 ASC NULLS LAST], has_header=true +--ProjectionExec: expr=[a@1 as a] +----CoalesceBatchesExec: target_batch_size=8192 +------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(c@0, c@1)] +--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], has_header=true +--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_ordering=[a@0 ASC NULLS LAST], has_header=true # preserve_inner_join query IIII nosort