Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix eq properties regression from #10434 #11363

Merged
merged 4 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 113 additions & 70 deletions datafusion/physical-expr/src/equivalence/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,56 +223,11 @@ impl EquivalenceProperties {
}
}

// Discover new valid orderings in light of the new equality. For a discussion, see:
// https://github.com/apache/datafusion/issues/9812
let mut new_orderings = vec![];
for ordering in self.normalized_oeq_class().iter() {
let expressions = if left.eq(&ordering[0].expr) {
// Left expression is leading ordering
Some((ordering[0].options, right))
} else if right.eq(&ordering[0].expr) {
// Right expression is leading ordering
Some((ordering[0].options, left))
} else {
None
};
if let Some((leading_ordering, other_expr)) = expressions {
// Currently, we only handle expressions with a single child.
// TODO: It should be possible to handle expressions orderings like
// f(a, b, c), a, b, c if f is monotonic in all arguments.
// First expression after leading ordering
if let Some(next_expr) = ordering.get(1) {
let children = other_expr.children();
if children.len() == 1
&& children[0].eq(&next_expr.expr)
&& SortProperties::Ordered(leading_ordering)
== other_expr
.get_properties(&[ExprProperties {
sort_properties: SortProperties::Ordered(
leading_ordering,
),
range: Interval::make_unbounded(
&other_expr.data_type(&self.schema)?,
)?,
}])?
.sort_properties
{
// Assume existing ordering is [a ASC, b ASC]
// When equality a = f(b) is given, If we know that given ordering `[b ASC]`, ordering `[f(b) ASC]` is valid,
// then we can deduce that ordering `[b ASC]` is also valid.
// Hence, ordering `[b ASC]` can be added to the state as valid ordering.
// (e.g. existing ordering where leading ordering is removed)
new_orderings.push(ordering[1..].to_vec());
}
}
}
}
if !new_orderings.is_empty() {
self.oeq_class.add_new_orderings(new_orderings);
}

// Add equal expressions to the state
self.eq_group.add_equal_conditions(left, right);

// Discover any new orderings
self.discover_new_orderings(left)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

Ok(())
}

Expand Down Expand Up @@ -304,9 +259,78 @@ impl EquivalenceProperties {
self.constants.push(const_expr);
}
}

for ordering in self.normalized_oeq_class().iter() {
if let Err(e) = self.discover_new_orderings(&ordering[0].expr) {
log::debug!("error discovering new orderings: {e}");
}
}

self
}

// Discover new valid orderings in light of a new equality.
// Accepts a single argument (`expr`) which is used to determine
// which orderings should be updated.
// When constants or equivalence classes are changed, there may be new orderings
// that can be discovered with the new equivalence properties.
// For a discussion, see: https://github.com/apache/datafusion/issues/9812
fn discover_new_orderings(&mut self, expr: &Arc<dyn PhysicalExpr>) -> Result<()> {
let normalized_expr = self.eq_group().normalize_expr(Arc::clone(expr));
let eq_class = self
.eq_group
.classes
.iter()
.find_map(|class| {
class
.contains(&normalized_expr)
.then(|| class.clone().into_vec())
})
.unwrap_or_else(|| vec![Arc::clone(&normalized_expr)]);

let mut new_orderings: Vec<LexOrdering> = vec![];
for (ordering, next_expr) in self
.normalized_oeq_class()
.iter()
.filter(|ordering| ordering[0].expr.eq(&normalized_expr))
// First expression after leading ordering
.filter_map(|ordering| Some(ordering).zip(ordering.get(1)))
{
let leading_ordering = ordering[0].options;
// Currently, we only handle expressions with a single child.
// TODO: It should be possible to handle expressions orderings like
// f(a, b, c), a, b, c if f is monotonic in all arguments.
for equivalent_expr in &eq_class {
let children = equivalent_expr.children();
if children.len() == 1
&& children[0].eq(&next_expr.expr)
&& SortProperties::Ordered(leading_ordering)
== equivalent_expr
.get_properties(&[ExprProperties {
sort_properties: SortProperties::Ordered(
leading_ordering,
),
range: Interval::make_unbounded(
&equivalent_expr.data_type(&self.schema)?,
)?,
}])?
.sort_properties
{
// Assume existing ordering is [a ASC, b ASC]
// When equality a = f(b) is given, If we know that given ordering `[b ASC]`, ordering `[f(b) ASC]` is valid,
// then we can deduce that ordering `[b ASC]` is also valid.
// Hence, ordering `[b ASC]` can be added to the state as valid ordering.
// (e.g. existing ordering where leading ordering is removed)
new_orderings.push(ordering[1..].to_vec());
break;
}
}
}

self.oeq_class.add_new_orderings(new_orderings);
Ok(())
}

/// Updates the ordering equivalence group within assuming that the table
/// is re-sorted according to the argument `sort_exprs`. Note that constants
/// and equivalence classes are unchanged as they are unaffected by a re-sort.
Expand Down Expand Up @@ -2454,30 +2478,49 @@ mod tests {
];

for case in cases {
let mut properties = base_properties
.clone()
.add_constants(case.constants.into_iter().map(ConstExpr::from));
for [left, right] in &case.equal_conditions {
properties.add_equal_conditions(left, right)?
}

let sort = case
.sort_columns
.iter()
.map(|&name| {
col(name, &schema).map(|col| PhysicalSortExpr {
expr: col,
options: SortOptions::default(),
// Construct the equivalence properties in different orders
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also verified test coverage by running these tests without the code changes and verified they do in fact fail ✅


thread 'equivalence::properties::tests::test_eliminate_redundant_monotonic_sorts' panicked at datafusion/physical-expr/src/equivalence/properties.rs:2493:17:
assertion `left == right` failed: failed test '(a, b, c) -> (c)'
  left: false
 right: true
stack backtrace:
   0: rust_begin_unwind
             at /rustc/129f3b9964af4d4a709d1383930ade12dfe7c081/library/std/src/panicking.rs:652:5
   1: core::panicking::panic_fmt
             at /rustc/129f3b9964af4d4a709d1383930ade12dfe7c081/library/core/src/panicking.rs:72:14
   2: core::panicking::assert_failed_inner
   3: core::panicking::assert_failed
             at /rustc/129f3b9964af4d4a709d1383930ade12dfe7c081/library/core/src/panicking.rs:364:5
   4: datafusion_physical_expr::equivalence::properties::tests::test_eliminate_redundant_monotonic_sorts
             at ./src/equivalence/properties.rs:2493:17
   5: datafusion_physical_expr::equivalence::properties::tests::test_eliminate_redundant_monotonic_sorts::{{closure}}
             at ./src/equivalence/properties.rs:2382:54
   6: core::ops::function::FnOnce::call_once
             at /rustc/129f3b9964af4d4a709d1383930ade12dfe7c081/library/core/src/ops/function.rs:250:5
   7: core::ops::function::FnOnce::call_once
             at /rustc/129f3b9964af4d4a709d1383930ade12dfe7c081/library/core/src/ops/function.rs:250:5
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.

error: test failed, to rerun pass `--lib`
error: 1 target failed:
    `--lib`

// to exercise different code paths
// (The resulting properties _should_ be the same)
for properties in [
// Equal conditions before constants
{
let mut properties = base_properties.clone();
for [left, right] in &case.equal_conditions {
properties.add_equal_conditions(left, right)?
}
properties.add_constants(
case.constants.iter().cloned().map(ConstExpr::from),
)
},
// Constants before equal conditions
{
let mut properties = base_properties.clone().add_constants(
case.constants.iter().cloned().map(ConstExpr::from),
);
for [left, right] in &case.equal_conditions {
properties.add_equal_conditions(left, right)?
}
properties
},
] {
let sort = case
.sort_columns
.iter()
.map(|&name| {
col(name, &schema).map(|col| PhysicalSortExpr {
expr: col,
options: SortOptions::default(),
})
})
})
.collect::<Result<Vec<_>>>()?;
.collect::<Result<Vec<_>>>()?;

assert_eq!(
properties.ordering_satisfy(&sort),
case.should_satisfy_ordering,
"failed test '{}'",
case.name
);
assert_eq!(
properties.ordering_satisfy(&sort),
case.should_satisfy_ordering,
"failed test '{}'",
case.name
);
}
}

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl FilterExec {

Ok(Self {
predicate,
input: input.clone(),
input: Arc::clone(&input),
metrics: ExecutionPlanMetricsSet::new(),
default_selectivity,
cache,
Expand Down