From f2b29d999d1d7ea7c8d98bc85d3dc70d8eeb18c1 Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Wed, 12 Oct 2022 22:25:15 +0300 Subject: [PATCH] Addressing review feedback --- .../physical_optimizer/hash_build_probe_order.rs | 15 +++++++++++++++ datafusion/core/src/physical_plan/hash_join.rs | 6 +++--- datafusion/core/src/physical_plan/join_utils.rs | 14 +++++++------- 3 files changed, 25 insertions(+), 10 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs b/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs index d6694d4bca6a9..6a94250741a86 100644 --- a/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs +++ b/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs @@ -229,6 +229,11 @@ mod tests { (big, small) } + /// Create a column statistics vector for a single column + /// that has the given min/max/distinct_count properties. + /// + /// Given min/max will be mapped to a [`ScalarValue`] if + /// they are not `None`. fn create_column_stats( min: Option, max: Option, @@ -242,6 +247,10 @@ mod tests { }]) } + /// Returns three plans with statistics of (min, max, distinct_count) + /// * big 100K rows @ (0, 50k, 50k) + /// * medium 10K rows @ (1k, 5k, 1k) + /// * small 1K rows @ (0, 100k, 1k) fn create_nested_with_min_max() -> ( Arc, Arc, @@ -362,6 +371,7 @@ mod tests { async fn test_nested_join_swap() { let (big, medium, small) = create_nested_with_min_max(); + // Form the inner join: big JOIN small let child_join = HashJoinExec::try_new( Arc::clone(&big), Arc::clone(&small), @@ -377,6 +387,7 @@ mod tests { .unwrap(); let child_schema = child_join.schema(); + // Form join tree `medium LEFT JOIN (big JOIN small)` let join = HashJoinExec::try_new( Arc::clone(&medium), Arc::new(child_join), @@ -391,6 +402,10 @@ mod tests { ) .unwrap(); + // Hash join uses the left side to build the hash table, and right side to probe it. We want + // to keep left as small as possible, so if we can estimate (with a reasonable margin of error) + // that the left side is smaller than the right side, we should swap the sides. + // // The first hash join's left is 'small' table (with 1000 rows), and the second hash join's // left is the F(small IJ big) which has an estimated cardinality of 2000 rows (vs medium which // has an exact cardinality of 10_000 rows). diff --git a/datafusion/core/src/physical_plan/hash_join.rs b/datafusion/core/src/physical_plan/hash_join.rs index 7c06aa97cbc2d..c0b8eaea34aad 100644 --- a/datafusion/core/src/physical_plan/hash_join.rs +++ b/datafusion/core/src/physical_plan/hash_join.rs @@ -59,8 +59,8 @@ use super::{ coalesce_partitions::CoalescePartitionsExec, expressions::PhysicalSortExpr, join_utils::{ - build_join_schema, check_join_is_valid, join_statistics, ColumnIndex, JoinFilter, - JoinOn, JoinSide, + build_join_schema, check_join_is_valid, estimate_join_statistics, ColumnIndex, + JoinFilter, JoinOn, JoinSide, }, }; use super::{ @@ -386,7 +386,7 @@ impl ExecutionPlan for HashJoinExec { // TODO stats: it is not possible in general to know the output size of joins // There are some special cases though, for example: // - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)` - join_statistics( + estimate_join_statistics( self.left.clone(), self.right.clone(), self.on.clone(), diff --git a/datafusion/core/src/physical_plan/join_utils.rs b/datafusion/core/src/physical_plan/join_utils.rs index 2aee190044f23..a63f39e51b296 100644 --- a/datafusion/core/src/physical_plan/join_utils.rs +++ b/datafusion/core/src/physical_plan/join_utils.rs @@ -307,8 +307,8 @@ struct PartialJoinStatistics { pub column_statistics: Vec, } -/// Calculate the statistics for the given join's output. -pub(crate) fn join_statistics( +/// Estimate the statistics for the given join's output. +pub(crate) fn estimate_join_statistics( left: Arc, right: Arc, on: JoinOn, @@ -423,17 +423,17 @@ fn inner_join_cardinality( return None; } - let col_selectivity = max(left_stat.distinct_count, right_stat.distinct_count); - if col_selectivity > join_selectivity { + let max_distinct = max(left_stat.distinct_count, right_stat.distinct_count); + if max_distinct > join_selectivity { // Seems like there are a few implementations of this algorithm that implement // exponential decay for the selectivity (like Hive's Optiq Optimizer). Needs // further exploration. - join_selectivity = col_selectivity; + join_selectivity = max_distinct; } } // With the assumption that the smaller input's domain is generally represented in the bigger - // input's domain, we can calculate the inner join's cardinality by taking the cartesian product + // input's domain, we can estimate the inner join's cardinality by taking the cartesian product // of the two inputs and normalizing it by the selectivity factor. let cardinality = match join_selectivity { Some(selectivity) if selectivity > 0 => { @@ -640,7 +640,7 @@ mod tests { type PartialStats = (usize, u64, u64, Option); - // This is mainly for validating the all edge cases of the calculation, but + // This is mainly for validating the all edge cases of the estimation, but // more advanced (and real world test cases) are below where we need some control // over the expected output (since it depends on join type to join type). #[test]