Skip to content

Commit

Permalink
Support for Sliding Windows Joins with Symmetric Hash Join (SHJ) (#5322)
Browse files Browse the repository at this point in the history
* Prunable symmetric hash join implementation

* Minor changes after merge

* Filter mapping inside SymmetricHashJoin

* Commenting on

* Minor changes after merge

* Simplify interval arithmetic library code

* Make the interval arithmetics library more robust

* After merge corrections

* Simplifications to constraint propagation code

* Revamp some API's and enhance comments

- Utilize estimate_bounds without propagation for better API.
- Remove coupling between node_index & PhysicalExpr pairing and graph.
- Better commenting on symmetric hash join while using graph

* Resolve a propagation bug and make the propagation returns an opt. status

* Refactor and simplify CP code, improve comments

* Code deduplication between pipeline fixer and utils, also enhance comments.

* Refactor on input stream consumer on SymmetricHashJoin

* After merge resolution, before proto update

* Revery unnecessary changes in some exprs

Also, cargo.lock update.

* Remove support indicators to interval library, rename module to use the standard name

* Simplify PipelineFixer, remove clones, improve comments

* Enhance the symmetric hash join code with reviews

* Revamp according to reviews

* Use a simple, stateless, one-liner DFS to check for IA support

* Move test function to a test_utils module

* Simplify DAG creation code

* Reducing code change

* Comment improvements and simplifications

* Revamp SortedFilterExpr usage and enhance comments

* Update fifo.rs

* Remove unnecessary clones, improve comments and code structure

* Remove leaf searches from CP iterations, improve code
organization/comments

* Bug fix in cp_solver, revamp some comments

* Update with correct testing

* Test for future support on fuzzy matches between exprs

* Compute connected nodes in CP solver via a DFS, improve comments

* Revamp OneSideHashJoin constructor and new unit test

* Update on concat_batches usage

* Revamping according to comments.

* Simplifications, refactoring

* Minor fix

* Fix typo in the new_zero function

---------

Co-authored-by: Mehmet Ozan Kabak <[email protected]>
  • Loading branch information
metesynnada and ozankabak authored Mar 1, 2023
1 parent 03fbf9f commit 3c1e4c0
Show file tree
Hide file tree
Showing 26 changed files with 5,653 additions and 201 deletions.
17 changes: 17 additions & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1019,6 +1019,29 @@ impl ScalarValue {
Self::List(scalars, Box::new(Field::new("item", child_type, true)))
}

// Create a zero value in the given type.
pub fn new_zero(datatype: &DataType) -> Result<ScalarValue> {
assert!(datatype.is_primitive());
Ok(match datatype {
DataType::Boolean => ScalarValue::Boolean(Some(false)),
DataType::Int8 => ScalarValue::Int8(Some(0)),
DataType::Int16 => ScalarValue::Int16(Some(0)),
DataType::Int32 => ScalarValue::Int32(Some(0)),
DataType::Int64 => ScalarValue::Int64(Some(0)),
DataType::UInt8 => ScalarValue::UInt8(Some(0)),
DataType::UInt16 => ScalarValue::UInt16(Some(0)),
DataType::UInt32 => ScalarValue::UInt32(Some(0)),
DataType::UInt64 => ScalarValue::UInt64(Some(0)),
DataType::Float32 => ScalarValue::Float32(Some(0.0)),
DataType::Float64 => ScalarValue::Float64(Some(0.0)),
_ => {
return Err(DataFusionError::NotImplemented(format!(
"Can't create a zero scalar from data_type \"{datatype:?}\""
)));
}
})
}

/// Getter for the `DataType` of the value
pub fn get_datatype(&self) -> DataType {
match self {
Expand Down
3 changes: 3 additions & 0 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1584,6 +1584,9 @@ impl SessionState {
// repartitioning and local sorting steps to meet distribution and ordering requirements.
// Therefore, it should run before EnforceDistribution and EnforceSorting.
Arc::new(JoinSelection::new()),
// Enforce sort before PipelineFixer
Arc::new(EnforceDistribution::new()),
Arc::new(EnforceSorting::new()),
// If the query is processing infinite inputs, the PipelineFixer rule applies the
// necessary transformations to make the query runnable (if it is not already runnable).
// If the query can not be made runnable, the rule emits an error with a diagnostic message.
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_optimizer/pipeline_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ mod sql_tests {
FROM test
LIMIT 5".to_string(),
cases: vec![Arc::new(test1), Arc::new(test2)],
error_operator: "Window Error".to_string()
error_operator: "Sort Error".to_string()
};

case.run().await?;
Expand All @@ -328,7 +328,7 @@ mod sql_tests {
SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as sum1
FROM test".to_string(),
cases: vec![Arc::new(test1), Arc::new(test2)],
error_operator: "Window Error".to_string()
error_operator: "Sort Error".to_string()
};
case.run().await?;
Ok(())
Expand Down
173 changes: 159 additions & 14 deletions datafusion/core/src/physical_optimizer/pipeline_fixer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,19 @@ use crate::physical_optimizer::pipeline_checker::{
check_finiteness_requirements, PipelineStatePropagator,
};
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::joins::{HashJoinExec, PartitionMode};
use crate::physical_plan::joins::utils::JoinSide;
use crate::physical_plan::joins::{
convert_sort_expr_with_filter_schema, HashJoinExec, PartitionMode,
SymmetricHashJoinExec,
};
use crate::physical_plan::rewrite::TreeNodeRewritable;
use crate::physical_plan::ExecutionPlan;
use datafusion_common::DataFusionError;
use datafusion_expr::logical_plan::JoinType;
use datafusion_physical_expr::expressions::{BinaryExpr, CastExpr, Column, Literal};
use datafusion_physical_expr::intervals::{is_datatype_supported, is_operator_supported};
use datafusion_physical_expr::PhysicalExpr;

use std::sync::Arc;

/// The [PipelineFixer] rule tries to modify a given plan so that it can
Expand All @@ -48,17 +56,24 @@ impl PipelineFixer {
Self {}
}
}
/// [PipelineFixer] subrules are functions of this type. Such functions take a
/// single [PipelineStatePropagator] argument, which stores state variables
/// indicating the unboundedness status of the current [ExecutionPlan] as
/// the [PipelineFixer] rule traverses the entire plan tree.
type PipelineFixerSubrule =
dyn Fn(&PipelineStatePropagator) -> Option<Result<PipelineStatePropagator>>;
dyn Fn(PipelineStatePropagator) -> Option<Result<PipelineStatePropagator>>;

impl PhysicalOptimizerRule for PipelineFixer {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
let pipeline = PipelineStatePropagator::new(plan);
let physical_optimizer_subrules: Vec<Box<PipelineFixerSubrule>> =
vec![Box::new(hash_join_swap_subrule)];
let physical_optimizer_subrules: Vec<Box<PipelineFixerSubrule>> = vec![
Box::new(hash_join_convert_symmetric_subrule),
Box::new(hash_join_swap_subrule),
];
let state = pipeline.transform_up(&|p| {
apply_subrules_and_check_finiteness_requirements(
p,
Expand All @@ -77,6 +92,104 @@ impl PhysicalOptimizerRule for PipelineFixer {
}
}

/// Indicates whether interval arithmetic is supported for the given expression.
/// Currently, we do not support all [PhysicalExpr]s for interval calculations.
/// We do not support every type of [Operator]s either. Over time, this check
/// will relax as more types of [PhysicalExpr]s and [Operator]s are supported.
/// Currently, [CastExpr], [BinaryExpr], [Column] and [Literal] is supported.
fn check_support(expr: &Arc<dyn PhysicalExpr>) -> bool {
let expr_any = expr.as_any();
let expr_supported = if let Some(binary_expr) = expr_any.downcast_ref::<BinaryExpr>()
{
is_operator_supported(binary_expr.op())
} else {
expr_any.is::<Column>() || expr_any.is::<Literal>() || expr_any.is::<CastExpr>()
};
expr_supported && expr.children().iter().all(check_support)
}

/// This function returns whether a given hash join is replaceable by a
/// symmetric hash join. Basically, the requirement is that involved
/// [PhysicalExpr]s, [Operator]s and data types need to be supported,
/// and order information must cover every column in the filter expression.
fn is_suitable_for_symmetric_hash_join(hash_join: &HashJoinExec) -> Result<bool> {
if let Some(filter) = hash_join.filter() {
let left = hash_join.left();
if let Some(left_ordering) = left.output_ordering() {
let right = hash_join.right();
if let Some(right_ordering) = right.output_ordering() {
let expr_supported = check_support(filter.expression());
let left_convertible = convert_sort_expr_with_filter_schema(
&JoinSide::Left,
filter,
&left.schema(),
&left_ordering[0],
)?
.is_some();
let right_convertible = convert_sort_expr_with_filter_schema(
&JoinSide::Right,
filter,
&right.schema(),
&right_ordering[0],
)?
.is_some();
let fields_supported = filter
.schema()
.fields()
.iter()
.all(|f| is_datatype_supported(f.data_type()));
return Ok(expr_supported
&& fields_supported
&& left_convertible
&& right_convertible);
}
}
}
Ok(false)
}

/// This subrule checks if one can replace a hash join with a symmetric hash
/// join so that the pipeline does not break due to the join operation in
/// question. If possible, it makes this replacement; otherwise, it has no
/// effect.
fn hash_join_convert_symmetric_subrule(
input: PipelineStatePropagator,
) -> Option<Result<PipelineStatePropagator>> {
let plan = input.plan;
if let Some(hash_join) = plan.as_any().downcast_ref::<HashJoinExec>() {
let ub_flags = input.children_unbounded;
let (left_unbounded, right_unbounded) = (ub_flags[0], ub_flags[1]);
let new_plan = if left_unbounded && right_unbounded {
match is_suitable_for_symmetric_hash_join(hash_join) {
Ok(true) => SymmetricHashJoinExec::try_new(
hash_join.left().clone(),
hash_join.right().clone(),
hash_join
.on()
.iter()
.map(|(l, r)| (l.clone(), r.clone()))
.collect(),
hash_join.filter().unwrap().clone(),
hash_join.join_type(),
hash_join.null_equals_null(),
)
.map(|e| Arc::new(e) as _),
Ok(false) => Ok(plan),
Err(e) => return Some(Err(e)),
}
} else {
Ok(plan)
};
Some(new_plan.map(|plan| PipelineStatePropagator {
plan,
unbounded: left_unbounded || right_unbounded,
children_unbounded: ub_flags,
}))
} else {
None
}
}

/// This subrule will swap build/probe sides of a hash join depending on whether its inputs
/// may produce an infinite stream of records. The rule ensures that the left (build) side
/// of the hash join always operates on an input stream that will produce a finite set of.
Expand Down Expand Up @@ -119,12 +232,12 @@ impl PhysicalOptimizerRule for PipelineFixer {
///
/// ```
fn hash_join_swap_subrule(
input: &PipelineStatePropagator,
input: PipelineStatePropagator,
) -> Option<Result<PipelineStatePropagator>> {
let plan = input.plan.clone();
let children = &input.children_unbounded;
let plan = input.plan;
if let Some(hash_join) = plan.as_any().downcast_ref::<HashJoinExec>() {
let (left_unbounded, right_unbounded) = (children[0], children[1]);
let ub_flags = input.children_unbounded;
let (left_unbounded, right_unbounded) = (ub_flags[0], ub_flags[1]);
let new_plan = if left_unbounded && !right_unbounded {
if matches!(
*hash_join.join_type(),
Expand All @@ -140,12 +253,11 @@ fn hash_join_swap_subrule(
} else {
Ok(plan)
};
let new_state = new_plan.map(|plan| PipelineStatePropagator {
Some(new_plan.map(|plan| PipelineStatePropagator {
plan,
unbounded: left_unbounded || right_unbounded,
children_unbounded: vec![left_unbounded, right_unbounded],
});
Some(new_state)
children_unbounded: ub_flags,
}))
} else {
None
}
Expand Down Expand Up @@ -182,13 +294,46 @@ fn apply_subrules_and_check_finiteness_requirements(
physical_optimizer_subrules: &Vec<Box<PipelineFixerSubrule>>,
) -> Result<Option<PipelineStatePropagator>> {
for sub_rule in physical_optimizer_subrules {
if let Some(value) = sub_rule(&input).transpose()? {
if let Some(value) = sub_rule(input.clone()).transpose()? {
input = value;
}
}
check_finiteness_requirements(input)
}

#[cfg(test)]
mod util_tests {
use crate::physical_optimizer::pipeline_fixer::check_support;
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::{BinaryExpr, Column, NegativeExpr};
use datafusion_physical_expr::PhysicalExpr;
use std::sync::Arc;

#[test]
fn check_expr_supported() {
let supported_expr = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Plus,
Arc::new(Column::new("a", 0)),
)) as Arc<dyn PhysicalExpr>;
assert!(check_support(&supported_expr));
let supported_expr_2 = Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>;
assert!(check_support(&supported_expr_2));
let unsupported_expr = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Or,
Arc::new(Column::new("a", 0)),
)) as Arc<dyn PhysicalExpr>;
assert!(!check_support(&unsupported_expr));
let unsupported_expr_2 = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Or,
Arc::new(NegativeExpr::new(Arc::new(Column::new("a", 0)))),
)) as Arc<dyn PhysicalExpr>;
assert!(!check_support(&unsupported_expr_2));
}
}

#[cfg(test)]
mod hash_join_tests {
use super::*;
Expand Down Expand Up @@ -574,7 +719,7 @@ mod hash_join_tests {
children_unbounded: vec![left_unbounded, right_unbounded],
};
let optimized_hash_join =
hash_join_swap_subrule(&initial_hash_join_state).unwrap()?;
hash_join_swap_subrule(initial_hash_join_state).unwrap()?;
let optimized_join_plan = optimized_hash_join.plan;

// If swap did happen
Expand Down
9 changes: 4 additions & 5 deletions datafusion/core/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,9 @@ use arrow::{
record_batch::RecordBatch,
};
use datafusion_common::{downcast_value, ScalarValue};
use datafusion_physical_expr::{expressions as phys_expr, PhysicalExprRef};

use datafusion_physical_expr::rewrite::{TreeNodeRewritable, TreeNodeRewriter};
use datafusion_physical_expr::utils::get_phys_expr_columns;
use datafusion_physical_expr::utils::collect_columns;
use datafusion_physical_expr::{expressions as phys_expr, PhysicalExprRef};
use log::trace;

/// Interface to pass statistics information to [`PruningPredicate`]
Expand Down Expand Up @@ -447,8 +446,8 @@ impl<'a> PruningExpressionBuilder<'a> {
required_columns: &'a mut RequiredStatColumns,
) -> Result<Self> {
// find column name; input could be a more complicated expression
let left_columns = get_phys_expr_columns(left);
let right_columns = get_phys_expr_columns(right);
let left_columns = collect_columns(left);
let right_columns = collect_columns(right);
let (column_expr, scalar_expr, columns, correct_operator) =
match (left_columns.len(), right_columns.len()) {
(1, 0) => (left, right, left_columns, op),
Expand Down
Loading

0 comments on commit 3c1e4c0

Please sign in to comment.