Skip to content

Commit

Permalink
Preserve ordering equivalencies on with_reorder
Browse files Browse the repository at this point in the history
  • Loading branch information
gokselk committed Nov 7, 2024
1 parent d2a15b3 commit 076cb9e
Show file tree
Hide file tree
Showing 2 changed files with 216 additions and 2 deletions.
4 changes: 4 additions & 0 deletions datafusion/physical-expr/src/equivalence/class.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,10 @@ impl EquivalenceGroup {
JoinType::RightSemi | JoinType::RightAnti => right_equivalences.clone(),
}
}

pub fn exprs_equal(&self, left: &Arc<dyn PhysicalExpr>, right: &Arc<dyn PhysicalExpr>) -> bool {
left.eq(right) || self.iter().any(|cls| cls.contains(left) && cls.contains(right))
}
}

impl Display for EquivalenceGroup {
Expand Down
214 changes: 212 additions & 2 deletions datafusion/physical-expr/src/equivalence/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,11 +392,55 @@ impl EquivalenceProperties {
/// is re-sorted according to the argument `sort_exprs`. Note that constants
/// and equivalence classes are unchanged as they are unaffected by a re-sort.
pub fn with_reorder(mut self, sort_exprs: Vec<PhysicalSortExpr>) -> Self {
// TODO: In some cases, existing ordering equivalences may still be valid add this analysis.
self.oeq_class = OrderingEquivalenceClass::new(vec![sort_exprs]);
// Filter out constant expressions as they don't affect ordering
let filtered_exprs: Vec<PhysicalSortExpr> = sort_exprs
.into_iter()
.filter(|expr| !self.is_expr_constant(&expr.expr))
.collect();

let mut orderings = vec![filtered_exprs.clone()];

// Preserve valid suffixes from existing orderings
for existing in self.oeq_class.orderings.iter() {
if let Some(extended) = self.try_extend_ordering(&filtered_exprs, existing) {
if extended.len() > filtered_exprs.len() {
orderings.push(extended);
}
}
}

self.oeq_class = OrderingEquivalenceClass::new(orderings);
self
}

/// Attempts to extend the new ordering with a suffix from an existing ordering.
///
/// Returns Some(extended_ordering) if the new ordering matches a prefix of the
/// existing ordering (considering expression equivalences), None otherwise.
fn try_extend_ordering(
&self,
new_order: &[PhysicalSortExpr],
existing: &[PhysicalSortExpr],
) -> Option<Vec<PhysicalSortExpr>> {
// Check if new order is longer than existing - can't be a prefix
if new_order.len() > existing.len() {
return None;
}

// Check if new order matches existing prefix (considering equivalences)
let prefix_matches = new_order.iter().zip(existing).all(|(new, existing)| {
self.eq_group.exprs_equal(&new.expr, &existing.expr) &&
new.options == existing.options
});

if prefix_matches {
// Combine new order with existing suffix
Some([new_order, &existing[new_order.len()..]].concat())
} else {
None
}
}

/// Normalizes the given sort expressions (i.e. `sort_exprs`) using the
/// equivalence group and the ordering equivalence class within.
///
Expand Down Expand Up @@ -3645,4 +3689,170 @@ mod tests {

sort_expr
}

#[test]
fn test_with_reorder_constant_filtering() -> Result<()> {
let schema = create_test_schema()?;
let mut eq_properties = EquivalenceProperties::new(schema.clone());

// Setup constant columns
let col_a = col("a", &schema)?;
let col_b = col("b", &schema)?;
eq_properties = eq_properties.with_constants([ConstExpr::from(&col_a)]);

let sort_exprs = vec![
PhysicalSortExpr {
expr: Arc::clone(&col_a),
options: SortOptions::default(),
},
PhysicalSortExpr {
expr: Arc::clone(&col_b),
options: SortOptions::default(),
},
];

let result = eq_properties.with_reorder(sort_exprs);

// Should only contain b since a is constant
assert_eq!(result.oeq_class().len(), 1);
assert_eq!(result.oeq_class().orderings[0].len(), 1);
assert!(result.oeq_class().orderings[0][0].expr.eq(&col_b));

Ok(())
}

#[test]
fn test_with_reorder_preserve_suffix() -> Result<()> {
let schema = create_test_schema()?;
let mut eq_properties = EquivalenceProperties::new(schema.clone());

let col_a = col("a", &schema)?;
let col_b = col("b", &schema)?;
let col_c = col("c", &schema)?;

let asc = SortOptions::default();
let desc = SortOptions {
descending: true,
nulls_first: true,
};

// Initial ordering: [a ASC, b DESC, c ASC]
eq_properties.add_new_orderings([vec![
PhysicalSortExpr {
expr: Arc::clone(&col_a),
options: asc,
},
PhysicalSortExpr {
expr: Arc::clone(&col_b),
options: desc,
},
PhysicalSortExpr {
expr: Arc::clone(&col_c),
options: asc,
},
]]);

// New ordering: [a ASC]
let new_order = vec![PhysicalSortExpr {
expr: Arc::clone(&col_a),
options: asc,
}];

let result = eq_properties.with_reorder(new_order);

// Should only contain [a ASC, b DESC, c ASC]
assert_eq!(result.oeq_class().len(), 1);
assert_eq!(result.oeq_class().orderings[0].len(), 3);
assert!(result.oeq_class().orderings[0][0].expr.eq(&col_a));
assert!(result.oeq_class().orderings[0][1].expr.eq(&col_b));
assert!(result.oeq_class().orderings[0][2].expr.eq(&col_c));

Ok(())
}

#[test]
fn test_with_reorder_equivalent_expressions() -> Result<()> {
let schema = create_test_schema()?;
let mut eq_properties = EquivalenceProperties::new(schema.clone());

let col_a = col("a", &schema)?;
let col_b = col("b", &schema)?;
let col_c = col("c", &schema)?;

// Make a and b equivalent
eq_properties.add_equal_conditions(&col_a, &col_b)?;

let asc = SortOptions::default();

// Initial ordering: [a ASC, c ASC]
eq_properties.add_new_orderings([vec![
PhysicalSortExpr {
expr: Arc::clone(&col_a),
options: asc,
},
PhysicalSortExpr {
expr: Arc::clone(&col_c),
options: asc,
},
]]);

// New ordering: [b ASC]
let new_order = vec![PhysicalSortExpr {
expr: Arc::clone(&col_b),
options: asc,
}];

let result = eq_properties.with_reorder(new_order);

// Should only contain [b ASC, c ASC]
assert_eq!(result.oeq_class().len(), 1);

// Verify orderings
assert_eq!(result.oeq_class().orderings[0].len(), 2);
assert!(result.oeq_class().orderings[0][0].expr.eq(&col_b));
assert!(result.oeq_class().orderings[0][1].expr.eq(&col_c));

Ok(())
}

#[test]
fn test_with_reorder_incompatible_prefix() -> Result<()> {
let schema = create_test_schema()?;
let mut eq_properties = EquivalenceProperties::new(schema.clone());

let col_a = col("a", &schema)?;
let col_b = col("b", &schema)?;

let asc = SortOptions::default();
let desc = SortOptions {
descending: true,
nulls_first: true,
};

// Initial ordering: [a ASC, b DESC]
eq_properties.add_new_orderings([vec![
PhysicalSortExpr {
expr: Arc::clone(&col_a),
options: asc,
},
PhysicalSortExpr {
expr: Arc::clone(&col_b),
options: desc,
},
]]);

// New ordering: [a DESC]
let new_order = vec![PhysicalSortExpr {
expr: Arc::clone(&col_a),
options: desc,
}];

let result = eq_properties.with_reorder(new_order.clone());

// Should only contain the new ordering since options don't match
assert_eq!(result.oeq_class().len(), 1);
assert_eq!(result.oeq_class().orderings[0], new_order);

Ok(())
}
}

0 comments on commit 076cb9e

Please sign in to comment.