Skip to content

Commit

Permalink
Addressing review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
isidentical committed Oct 12, 2022
1 parent aa76e42 commit f2b29d9
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 10 deletions.
15 changes: 15 additions & 0 deletions datafusion/core/src/physical_optimizer/hash_build_probe_order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,11 @@ mod tests {
(big, small)
}

/// Create a column statistics vector for a single column
/// that has the given min/max/distinct_count properties.
///
/// Given min/max will be mapped to a [`ScalarValue`] if
/// they are not `None`.
fn create_column_stats(
min: Option<u64>,
max: Option<u64>,
Expand All @@ -242,6 +247,10 @@ mod tests {
}])
}

/// Returns three plans with statistics of (min, max, distinct_count)
/// * big 100K rows @ (0, 50k, 50k)
/// * medium 10K rows @ (1k, 5k, 1k)
/// * small 1K rows @ (0, 100k, 1k)
fn create_nested_with_min_max() -> (
Arc<dyn ExecutionPlan>,
Arc<dyn ExecutionPlan>,
Expand Down Expand Up @@ -362,6 +371,7 @@ mod tests {
async fn test_nested_join_swap() {
let (big, medium, small) = create_nested_with_min_max();

// Form the inner join: big JOIN small
let child_join = HashJoinExec::try_new(
Arc::clone(&big),
Arc::clone(&small),
Expand All @@ -377,6 +387,7 @@ mod tests {
.unwrap();
let child_schema = child_join.schema();

// Form join tree `medium LEFT JOIN (big JOIN small)`
let join = HashJoinExec::try_new(
Arc::clone(&medium),
Arc::new(child_join),
Expand All @@ -391,6 +402,10 @@ mod tests {
)
.unwrap();

// Hash join uses the left side to build the hash table, and right side to probe it. We want
// to keep left as small as possible, so if we can estimate (with a reasonable margin of error)
// that the left side is smaller than the right side, we should swap the sides.
//
// The first hash join's left is 'small' table (with 1000 rows), and the second hash join's
// left is the F(small IJ big) which has an estimated cardinality of 2000 rows (vs medium which
// has an exact cardinality of 10_000 rows).
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/physical_plan/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ use super::{
coalesce_partitions::CoalescePartitionsExec,
expressions::PhysicalSortExpr,
join_utils::{
build_join_schema, check_join_is_valid, join_statistics, ColumnIndex, JoinFilter,
JoinOn, JoinSide,
build_join_schema, check_join_is_valid, estimate_join_statistics, ColumnIndex,
JoinFilter, JoinOn, JoinSide,
},
};
use super::{
Expand Down Expand Up @@ -386,7 +386,7 @@ impl ExecutionPlan for HashJoinExec {
// TODO stats: it is not possible in general to know the output size of joins
// There are some special cases though, for example:
// - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)`
join_statistics(
estimate_join_statistics(
self.left.clone(),
self.right.clone(),
self.on.clone(),
Expand Down
14 changes: 7 additions & 7 deletions datafusion/core/src/physical_plan/join_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,8 @@ struct PartialJoinStatistics {
pub column_statistics: Vec<ColumnStatistics>,
}

/// Calculate the statistics for the given join's output.
pub(crate) fn join_statistics(
/// Estimate the statistics for the given join's output.
pub(crate) fn estimate_join_statistics(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
on: JoinOn,
Expand Down Expand Up @@ -423,17 +423,17 @@ fn inner_join_cardinality(
return None;
}

let col_selectivity = max(left_stat.distinct_count, right_stat.distinct_count);
if col_selectivity > join_selectivity {
let max_distinct = max(left_stat.distinct_count, right_stat.distinct_count);
if max_distinct > join_selectivity {
// Seems like there are a few implementations of this algorithm that implement
// exponential decay for the selectivity (like Hive's Optiq Optimizer). Needs
// further exploration.
join_selectivity = col_selectivity;
join_selectivity = max_distinct;
}
}

// With the assumption that the smaller input's domain is generally represented in the bigger
// input's domain, we can calculate the inner join's cardinality by taking the cartesian product
// input's domain, we can estimate the inner join's cardinality by taking the cartesian product
// of the two inputs and normalizing it by the selectivity factor.
let cardinality = match join_selectivity {
Some(selectivity) if selectivity > 0 => {
Expand Down Expand Up @@ -640,7 +640,7 @@ mod tests {

type PartialStats = (usize, u64, u64, Option<usize>);

// This is mainly for validating the all edge cases of the calculation, but
// This is mainly for validating the all edge cases of the estimation, but
// more advanced (and real world test cases) are below where we need some control
// over the expected output (since it depends on join type to join type).
#[test]
Expand Down

0 comments on commit f2b29d9

Please sign in to comment.