Skip to content

Commit

Permalink
Addressing review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
isidentical committed Oct 12, 2022
1 parent aa76e42 commit 30f6dd7
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 18 deletions.
15 changes: 15 additions & 0 deletions datafusion/core/src/physical_optimizer/hash_build_probe_order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,11 @@ mod tests {
(big, small)
}

/// Create a column statistics vector for a single column
/// that has the given min/max/distinct_count properties.
///
/// Given min/max will be mapped to a [`ScalarValue`] if
/// they are not `None`.
fn create_column_stats(
min: Option<u64>,
max: Option<u64>,
Expand All @@ -242,6 +247,10 @@ mod tests {
}])
}

/// Returns three plans with statistics of (min, max, distinct_count)
/// * big 100K rows @ (0, 50k, 50k)
/// * medium 10K rows @ (1k, 5k, 1k)
/// * small 1K rows @ (0, 100k, 1k)
fn create_nested_with_min_max() -> (
Arc<dyn ExecutionPlan>,
Arc<dyn ExecutionPlan>,
Expand Down Expand Up @@ -362,6 +371,7 @@ mod tests {
async fn test_nested_join_swap() {
let (big, medium, small) = create_nested_with_min_max();

// Form the inner join: big JOIN small
let child_join = HashJoinExec::try_new(
Arc::clone(&big),
Arc::clone(&small),
Expand All @@ -377,6 +387,7 @@ mod tests {
.unwrap();
let child_schema = child_join.schema();

// Form join tree `medium LEFT JOIN (big JOIN small)`
let join = HashJoinExec::try_new(
Arc::clone(&medium),
Arc::new(child_join),
Expand All @@ -391,6 +402,10 @@ mod tests {
)
.unwrap();

// Hash join uses the left side to build the hash table, and right side to probe it. We want
// to keep left as small as possible, so if we can estimate (with a reasonable margin of error)
// that the left side is smaller than the right side, we should swap the sides.
//
// The first hash join's left is 'small' table (with 1000 rows), and the second hash join's
// left is the F(small IJ big) which has an estimated cardinality of 2000 rows (vs medium which
// has an exact cardinality of 10_000 rows).
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/physical_plan/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ use super::{
coalesce_partitions::CoalescePartitionsExec,
expressions::PhysicalSortExpr,
join_utils::{
build_join_schema, check_join_is_valid, join_statistics, ColumnIndex, JoinFilter,
JoinOn, JoinSide,
build_join_schema, check_join_is_valid, estimate_join_statistics, ColumnIndex,
JoinFilter, JoinOn, JoinSide,
},
};
use super::{
Expand Down Expand Up @@ -386,7 +386,7 @@ impl ExecutionPlan for HashJoinExec {
// TODO stats: it is not possible in general to know the output size of joins
// There are some special cases though, for example:
// - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)`
join_statistics(
estimate_join_statistics(
self.left.clone(),
self.right.clone(),
self.on.clone(),
Expand Down
30 changes: 15 additions & 15 deletions datafusion/core/src/physical_plan/join_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,8 @@ struct PartialJoinStatistics {
pub column_statistics: Vec<ColumnStatistics>,
}

/// Calculate the statistics for the given join's output.
pub(crate) fn join_statistics(
/// Estimate the statistics for the given join's output.
pub(crate) fn estimate_join_statistics(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
on: JoinOn,
Expand All @@ -317,7 +317,7 @@ pub(crate) fn join_statistics(
let left_stats = left.statistics();
let right_stats = right.statistics();

let join_stats = join_cardinality(join_type, left_stats, right_stats, &on);
let join_stats = estimate_join_cardinality(join_type, left_stats, right_stats, &on);
let (num_rows, column_statistics) = match join_stats {
Some(stats) => (Some(stats.num_rows), Some(stats.column_statistics)),
None => (None, None),
Expand All @@ -331,7 +331,7 @@ pub(crate) fn join_statistics(
}

// Estimate the cardinality for the given join with input statistics.
fn join_cardinality(
fn estimate_join_cardinality(
join_type: &JoinType,
left_stats: Statistics,
right_stats: Statistics,
Expand All @@ -356,7 +356,7 @@ fn join_cardinality(
})
.unzip::<_, _, Vec<_>, Vec<_>>();

let ij_cardinality = inner_join_cardinality(
let ij_cardinality = estimate_inner_join_cardinality(
left_num_rows,
right_num_rows,
left_col_stats,
Expand Down Expand Up @@ -401,7 +401,7 @@ fn join_cardinality(
/// column-level statistics and the total row count. This is a very naive and
/// a very conservative implementation that can quickly give up if there is not
/// enough input statistics.
fn inner_join_cardinality(
fn estimate_inner_join_cardinality(
left_num_rows: usize,
right_num_rows: usize,
left_col_stats: Vec<ColumnStatistics>,
Expand All @@ -423,17 +423,17 @@ fn inner_join_cardinality(
return None;
}

let col_selectivity = max(left_stat.distinct_count, right_stat.distinct_count);
if col_selectivity > join_selectivity {
let max_distinct = max(left_stat.distinct_count, right_stat.distinct_count);
if max_distinct > join_selectivity {
// Seems like there are a few implementations of this algorithm that implement
// exponential decay for the selectivity (like Hive's Optiq Optimizer). Needs
// further exploration.
join_selectivity = col_selectivity;
join_selectivity = max_distinct;
}
}

// With the assumption that the smaller input's domain is generally represented in the bigger
// input's domain, we can calculate the inner join's cardinality by taking the cartesian product
// input's domain, we can estimate the inner join's cardinality by taking the cartesian product
// of the two inputs and normalizing it by the selectivity factor.
let cardinality = match join_selectivity {
Some(selectivity) if selectivity > 0 => {
Expand Down Expand Up @@ -640,7 +640,7 @@ mod tests {

type PartialStats = (usize, u64, u64, Option<usize>);

// This is mainly for validating the all edge cases of the calculation, but
// This is mainly for validating the all edge cases of the estimation, but
// more advanced (and real world test cases) are below where we need some control
// over the expected output (since it depends on join type to join type).
#[test]
Expand Down Expand Up @@ -688,7 +688,7 @@ mod tests {
)];

assert_eq!(
inner_join_cardinality(
estimate_inner_join_cardinality(
left_num_rows,
right_num_rows,
left_col_stats.clone(),
Expand All @@ -700,7 +700,7 @@ mod tests {
// We should also be able to use join_cardinality to get the same results
let join_type = JoinType::Inner;
let join_on = vec![(Column::new("a", 0), Column::new("b", 0))];
let partial_join_stats = join_cardinality(
let partial_join_stats = estimate_join_cardinality(
&join_type,
create_stats(Some(left_num_rows), Some(left_col_stats.clone())),
create_stats(Some(right_num_rows), Some(right_col_stats.clone())),
Expand Down Expand Up @@ -734,7 +734,7 @@ mod tests {
// We have statistics about 4 columns, where the highest distinct
// count is 200, so we are going to pick it.
assert_eq!(
inner_join_cardinality(400, 400, left_col_stats, right_col_stats),
estimate_inner_join_cardinality(400, 400, left_col_stats, right_col_stats),
Some((400 * 400) / 200)
);
Ok(())
Expand Down Expand Up @@ -778,7 +778,7 @@ mod tests {
(Column::new("b", 1), Column::new("d", 1)),
];

let partial_join_stats = join_cardinality(
let partial_join_stats = estimate_join_cardinality(
&join_type,
create_stats(Some(1000), Some(left_col_stats.clone())),
create_stats(Some(2000), Some(right_col_stats.clone())),
Expand Down

0 comments on commit 30f6dd7

Please sign in to comment.