Skip to content

Commit

Permalink
fix: Fix eq properties regression from apache#10434 (apache#11363)
Browse files Browse the repository at this point in the history
* discover new orderings when constants are added

* more comments

* reduce nesting + describe argument

* lint?
  • Loading branch information
suremarc authored and findepi committed Jul 16, 2024
1 parent dbe9750 commit 644c6fa
Showing 1 changed file with 113 additions and 70 deletions.
183 changes: 113 additions & 70 deletions datafusion/physical-expr/src/equivalence/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,56 +223,11 @@ impl EquivalenceProperties {
}
}

// Discover new valid orderings in light of the new equality. For a discussion, see:
// https://github.com/apache/datafusion/issues/9812
let mut new_orderings = vec![];
for ordering in self.normalized_oeq_class().iter() {
let expressions = if left.eq(&ordering[0].expr) {
// Left expression is leading ordering
Some((ordering[0].options, right))
} else if right.eq(&ordering[0].expr) {
// Right expression is leading ordering
Some((ordering[0].options, left))
} else {
None
};
if let Some((leading_ordering, other_expr)) = expressions {
// Currently, we only handle expressions with a single child.
// TODO: It should be possible to handle expressions orderings like
// f(a, b, c), a, b, c if f is monotonic in all arguments.
// First expression after leading ordering
if let Some(next_expr) = ordering.get(1) {
let children = other_expr.children();
if children.len() == 1
&& children[0].eq(&next_expr.expr)
&& SortProperties::Ordered(leading_ordering)
== other_expr
.get_properties(&[ExprProperties {
sort_properties: SortProperties::Ordered(
leading_ordering,
),
range: Interval::make_unbounded(
&other_expr.data_type(&self.schema)?,
)?,
}])?
.sort_properties
{
// Assume existing ordering is [a ASC, b ASC]
// When equality a = f(b) is given, If we know that given ordering `[b ASC]`, ordering `[f(b) ASC]` is valid,
// then we can deduce that ordering `[b ASC]` is also valid.
// Hence, ordering `[b ASC]` can be added to the state as valid ordering.
// (e.g. existing ordering where leading ordering is removed)
new_orderings.push(ordering[1..].to_vec());
}
}
}
}
if !new_orderings.is_empty() {
self.oeq_class.add_new_orderings(new_orderings);
}

// Add equal expressions to the state
self.eq_group.add_equal_conditions(left, right);

// Discover any new orderings
self.discover_new_orderings(left)?;
Ok(())
}

Expand Down Expand Up @@ -304,9 +259,78 @@ impl EquivalenceProperties {
self.constants.push(const_expr);
}
}

for ordering in self.normalized_oeq_class().iter() {
if let Err(e) = self.discover_new_orderings(&ordering[0].expr) {
log::debug!("error discovering new orderings: {e}");
}
}

self
}

// Discover new valid orderings in light of a new equality.
// Accepts a single argument (`expr`) which is used to determine
// which orderings should be updated.
// When constants or equivalence classes are changed, there may be new orderings
// that can be discovered with the new equivalence properties.
// For a discussion, see: https://github.com/apache/datafusion/issues/9812
fn discover_new_orderings(&mut self, expr: &Arc<dyn PhysicalExpr>) -> Result<()> {
let normalized_expr = self.eq_group().normalize_expr(Arc::clone(expr));
let eq_class = self
.eq_group
.classes
.iter()
.find_map(|class| {
class
.contains(&normalized_expr)
.then(|| class.clone().into_vec())
})
.unwrap_or_else(|| vec![Arc::clone(&normalized_expr)]);

let mut new_orderings: Vec<LexOrdering> = vec![];
for (ordering, next_expr) in self
.normalized_oeq_class()
.iter()
.filter(|ordering| ordering[0].expr.eq(&normalized_expr))
// First expression after leading ordering
.filter_map(|ordering| Some(ordering).zip(ordering.get(1)))
{
let leading_ordering = ordering[0].options;
// Currently, we only handle expressions with a single child.
// TODO: It should be possible to handle expressions orderings like
// f(a, b, c), a, b, c if f is monotonic in all arguments.
for equivalent_expr in &eq_class {
let children = equivalent_expr.children();
if children.len() == 1
&& children[0].eq(&next_expr.expr)
&& SortProperties::Ordered(leading_ordering)
== equivalent_expr
.get_properties(&[ExprProperties {
sort_properties: SortProperties::Ordered(
leading_ordering,
),
range: Interval::make_unbounded(
&equivalent_expr.data_type(&self.schema)?,
)?,
}])?
.sort_properties
{
// Assume existing ordering is [a ASC, b ASC]
// When equality a = f(b) is given, If we know that given ordering `[b ASC]`, ordering `[f(b) ASC]` is valid,
// then we can deduce that ordering `[b ASC]` is also valid.
// Hence, ordering `[b ASC]` can be added to the state as valid ordering.
// (e.g. existing ordering where leading ordering is removed)
new_orderings.push(ordering[1..].to_vec());
break;
}
}
}

self.oeq_class.add_new_orderings(new_orderings);
Ok(())
}

/// Updates the ordering equivalence group within assuming that the table
/// is re-sorted according to the argument `sort_exprs`. Note that constants
/// and equivalence classes are unchanged as they are unaffected by a re-sort.
Expand Down Expand Up @@ -2454,30 +2478,49 @@ mod tests {
];

for case in cases {
let mut properties = base_properties
.clone()
.add_constants(case.constants.into_iter().map(ConstExpr::from));
for [left, right] in &case.equal_conditions {
properties.add_equal_conditions(left, right)?
}

let sort = case
.sort_columns
.iter()
.map(|&name| {
col(name, &schema).map(|col| PhysicalSortExpr {
expr: col,
options: SortOptions::default(),
// Construct the equivalence properties in different orders
// to exercise different code paths
// (The resulting properties _should_ be the same)
for properties in [
// Equal conditions before constants
{
let mut properties = base_properties.clone();
for [left, right] in &case.equal_conditions {
properties.add_equal_conditions(left, right)?
}
properties.add_constants(
case.constants.iter().cloned().map(ConstExpr::from),
)
},
// Constants before equal conditions
{
let mut properties = base_properties.clone().add_constants(
case.constants.iter().cloned().map(ConstExpr::from),
);
for [left, right] in &case.equal_conditions {
properties.add_equal_conditions(left, right)?
}
properties
},
] {
let sort = case
.sort_columns
.iter()
.map(|&name| {
col(name, &schema).map(|col| PhysicalSortExpr {
expr: col,
options: SortOptions::default(),
})
})
})
.collect::<Result<Vec<_>>>()?;
.collect::<Result<Vec<_>>>()?;

assert_eq!(
properties.ordering_satisfy(&sort),
case.should_satisfy_ordering,
"failed test '{}'",
case.name
);
assert_eq!(
properties.ordering_satisfy(&sort),
case.should_satisfy_ordering,
"failed test '{}'",
case.name
);
}
}

Ok(())
Expand Down

0 comments on commit 644c6fa

Please sign in to comment.