Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate PhysicalSortRequirement::from_sort_exprs and PhysicalSortRequirement::to_sort_exprs #13222

Merged
merged 5 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1423,7 +1423,6 @@ pub(crate) mod tests {
use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
use datafusion_physical_expr::{
expressions::binary, expressions::lit, LexOrdering, PhysicalSortExpr,
PhysicalSortRequirement,
};
use datafusion_physical_expr_common::sort_expr::LexRequirement;
use datafusion_physical_plan::PlanProperties;
Expand Down Expand Up @@ -1496,9 +1495,7 @@ pub(crate) mod tests {
if self.expr.is_empty() {
vec![None]
} else {
vec![Some(PhysicalSortRequirement::from_sort_exprs(
self.expr.iter(),
))]
vec![Some(LexRequirement::from(self.expr.clone()))]
Copy link
Contributor Author

@alamb alamb Nov 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a pretty good example of the potential improvement -- it is now clear this function is returning a LexRequirement and that it clones the underlying LexOrdering structure

This was still happening on main but the conversion was hidden

}
}

Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use crate::physical_plan::{Distribution, ExecutionPlan, InputOrderMode};

use datafusion_common::plan_err;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_physical_expr::{Partitioning, PhysicalSortRequirement};
use datafusion_physical_expr::Partitioning;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
Expand Down Expand Up @@ -221,7 +221,7 @@ fn replace_with_partial_sort(
// here we're trying to find the common prefix for sorted columns that is required for the
// sort and already satisfied by the given ordering
let child_eq_properties = child.equivalence_properties();
let sort_req = PhysicalSortRequirement::from_sort_exprs(sort_plan.expr());
let sort_req = LexRequirement::from(sort_plan.expr().clone());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, there is no more cloning going on than previously, just now the clone is explicit


let mut common_prefix_length = 0;
while child_eq_properties.ordering_satisfy_requirement(&LexRequirement {
Expand Down Expand Up @@ -275,8 +275,8 @@ fn parallelize_sorts(
{
// Take the initial sort expressions and requirements
let (sort_exprs, fetch) = get_sort_exprs(&requirements.plan)?;
let sort_reqs = PhysicalSortRequirement::from_sort_exprs(sort_exprs);
let sort_exprs = LexOrdering::new(sort_exprs.to_vec());
let sort_reqs = LexRequirement::from(sort_exprs.clone());
let sort_exprs = sort_exprs.clone();

// If there is a connection between a `CoalescePartitionsExec` and a
// global sort that satisfy the requirements (i.e. intermediate
Expand Down
27 changes: 13 additions & 14 deletions datafusion/core/src/physical_optimizer/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ fn pushdown_sorts_helper(
if is_sort(plan) {
let required_ordering = plan
.output_ordering()
.map(PhysicalSortRequirement::from_sort_exprs)
.cloned()
.map(LexRequirement::from)
.unwrap_or_default();
if !satisfy_parent {
// Make sure this `SortExec` satisfies parent requirements:
Expand Down Expand Up @@ -180,11 +181,12 @@ fn pushdown_requirement_to_children(
RequirementsCompatibility::NonCompatible => Ok(None),
}
} else if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
let sort_req = PhysicalSortRequirement::from_sort_exprs(
let sort_req = LexRequirement::from(
sort_exec
.properties()
.output_ordering()
.unwrap_or(&LexOrdering::default()),
.cloned()
.unwrap_or(LexOrdering::default()),
);
if sort_exec
.properties()
Expand All @@ -205,10 +207,11 @@ fn pushdown_requirement_to_children(
.iter()
.all(|maintain| *maintain)
{
let output_req = PhysicalSortRequirement::from_sort_exprs(
let output_req = LexRequirement::from(
plan.properties()
.output_ordering()
.unwrap_or(&LexOrdering::default()),
.cloned()
.unwrap_or(LexOrdering::default()),
);
// Push down through operator with fetch when:
// - requirement is aligned with output ordering
Expand All @@ -227,14 +230,12 @@ fn pushdown_requirement_to_children(
} else if is_union(plan) {
// UnionExec does not have real sort requirements for its input. Here we change the adjusted_request_ordering to UnionExec's output ordering and
// propagate the sort requirements down to correct the unnecessary descendant SortExec under the UnionExec
let req = (!parent_required.is_empty())
.then(|| LexRequirement::new(parent_required.to_vec()));
let req = (!parent_required.is_empty()).then(|| parent_required.clone());
Ok(Some(vec![req; plan.children().len()]))
} else if let Some(smj) = plan.as_any().downcast_ref::<SortMergeJoinExec>() {
// If the current plan is SortMergeJoinExec
let left_columns_len = smj.left().schema().fields().len();
let parent_required_expr =
PhysicalSortRequirement::to_sort_exprs(parent_required.iter().cloned());
let parent_required_expr = LexOrdering::from(parent_required.clone());
match expr_source_side(
parent_required_expr.as_ref(),
smj.join_type(),
Expand All @@ -251,8 +252,7 @@ fn pushdown_requirement_to_children(
smj.schema().fields.len() - smj.right().schema().fields.len();
let new_right_required =
shift_right_required(parent_required, right_offset)?;
let new_right_required_expr =
PhysicalSortRequirement::to_sort_exprs(new_right_required);
let new_right_required_expr = LexOrdering::from(new_right_required);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this structure makes it much clearer what is going on

try_pushdown_requirements_to_join(
smj,
parent_required,
Expand All @@ -278,8 +278,7 @@ fn pushdown_requirement_to_children(
// Pushing down is not beneficial
Ok(None)
} else if is_sort_preserving_merge(plan) {
let new_ordering =
PhysicalSortRequirement::to_sort_exprs(parent_required.to_vec());
let new_ordering = LexOrdering::from(parent_required.clone());
let mut spm_eqs = plan.equivalence_properties().clone();
// Sort preserving merge will have new ordering, one requirement above is pushed down to its below.
spm_eqs = spm_eqs.with_reorder(new_ordering);
Expand Down Expand Up @@ -412,7 +411,7 @@ fn try_pushdown_requirements_to_join(
let should_pushdown = smj_eqs.ordering_satisfy_requirement(parent_required);
Ok(should_pushdown.then(|| {
let mut required_input_ordering = smj.required_input_ordering();
let new_req = Some(PhysicalSortRequirement::from_sort_exprs(sort_expr));
let new_req = Some(LexRequirement::from(sort_expr.clone()));
match push_side {
JoinSide::Left => {
required_input_ordering[0] = new_req;
Expand Down
8 changes: 2 additions & 6 deletions datafusion/core/src/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@ use datafusion_physical_plan::{

use async_trait::async_trait;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr_common::sort_expr::{
LexOrdering, LexRequirement, PhysicalSortRequirement,
};
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};

async fn register_current_csv(
ctx: &SessionContext,
Expand Down Expand Up @@ -419,9 +417,7 @@ impl ExecutionPlan for RequirementsTestExec {
}

fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
let requirement = PhysicalSortRequirement::from_sort_exprs(
self.required_input_ordering.as_ref().iter(),
);
let requirement = LexRequirement::from(self.required_input_ordering.clone());
vec![Some(requirement)]
}

Expand Down
10 changes: 4 additions & 6 deletions datafusion/core/src/physical_optimizer/update_aggr_exprs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use datafusion_physical_expr::aggregate::AggregateFunctionExpr;
use datafusion_physical_expr::{
reverse_order_bys, EquivalenceProperties, PhysicalSortRequirement,
};
use datafusion_physical_expr_common::sort_expr::LexRequirement;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::aggregates::concat_slices;
use datafusion_physical_plan::windows::get_ordered_partition_by_indices;
Expand Down Expand Up @@ -139,12 +139,10 @@ fn try_convert_aggregate_if_better(
aggr_exprs
.into_iter()
.map(|aggr_expr| {
let aggr_sort_exprs = &aggr_expr.order_bys().cloned().unwrap_or_default();
let aggr_sort_exprs = aggr_expr.order_bys().unwrap_or(LexOrdering::empty());
let reverse_aggr_sort_exprs = reverse_order_bys(aggr_sort_exprs);
let aggr_sort_reqs =
PhysicalSortRequirement::from_sort_exprs(aggr_sort_exprs.iter());
let reverse_aggr_req =
PhysicalSortRequirement::from_sort_exprs(&reverse_aggr_sort_exprs.inner);
let aggr_sort_reqs = LexRequirement::from(aggr_sort_exprs.clone());
let reverse_aggr_req = LexRequirement::from(reverse_aggr_sort_exprs);

// If the aggregate expression benefits from input ordering, and
// there is an actual ordering enabling this, try to update the
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/physical_optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use crate::physical_plan::union::UnionExec;
use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
use crate::physical_plan::{ExecutionPlan, ExecutionPlanProperties};

use datafusion_physical_expr::{LexRequirement, PhysicalSortRequirement};
use datafusion_physical_expr::LexRequirement;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion_physical_plan::tree_node::PlanContext;

Expand All @@ -38,7 +39,7 @@ pub fn add_sort_above<T: Clone + Default>(
sort_requirements: LexRequirement,
fetch: Option<usize>,
) -> PlanContext<T> {
let mut sort_expr = PhysicalSortRequirement::to_sort_exprs(sort_requirements);
let mut sort_expr = LexOrdering::from(sort_requirements);
sort_expr.inner.retain(|sort_expr| {
!node
.plan
Expand Down
29 changes: 6 additions & 23 deletions datafusion/physical-expr-common/src/sort_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,26 +300,14 @@ impl PhysicalSortRequirement {
})
}

/// Returns [`PhysicalSortRequirement`] that requires the exact
/// sort of the [`PhysicalSortExpr`]s in `ordering`
///
/// This method takes `&'a PhysicalSortExpr` to make it easy to
/// use implementing [`ExecutionPlan::required_input_ordering`].
///
/// [`ExecutionPlan::required_input_ordering`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#method.required_input_ordering
#[deprecated(since = "43.0.0", note = "use LexRequirement::from_lex_ordering")]
pub fn from_sort_exprs<'a>(
ordering: impl IntoIterator<Item = &'a PhysicalSortExpr>,
) -> LexRequirement {
let ordering = ordering.into_iter().cloned().collect();
LexRequirement::from_lex_ordering(ordering)
}

/// Converts an iterator of [`PhysicalSortRequirement`] into a Vec
/// of [`PhysicalSortExpr`]s.
///
/// This function converts `PhysicalSortRequirement` to `PhysicalSortExpr`
/// for each entry in the input. If required ordering is None for an entry
/// default ordering `ASC, NULLS LAST` if given (see the `PhysicalSortExpr::from`).
#[deprecated(since = "43.0.0", note = "use LexOrdering::from_lex_requirement")]
pub fn to_sort_exprs(
requirements: impl IntoIterator<Item = PhysicalSortRequirement>,
) -> LexOrdering {
Expand Down Expand Up @@ -420,8 +408,8 @@ impl LexOrdering {
/// This function converts `PhysicalSortRequirement` to `PhysicalSortExpr`
/// for each entry in the input. If required ordering is None for an entry
/// default ordering `ASC, NULLS LAST` if given (see the `PhysicalSortExpr::from`).
pub fn from_lex_requirement(requirements: LexRequirement) -> LexOrdering {
requirements
pub fn from_lex_requirement(requirement: LexRequirement) -> LexOrdering {
requirement
.into_iter()
.map(PhysicalSortExpr::from)
.collect()
Expand Down Expand Up @@ -545,15 +533,10 @@ impl LexRequirement {
self.inner.push(physical_sort_requirement)
}

/// Create a new [`LexRequirement`] from a vector of [`PhysicalSortExpr`]s.
/// Create a new [`LexRequirement`] from a [`LexOrdering`]
///
/// Returns [`PhysicalSortRequirement`] that requires the exact
/// Returns [`LexRequirement`] that requires the exact
/// sort of the [`PhysicalSortExpr`]s in `ordering`
///
/// This method takes `&'a PhysicalSortExpr` to make it easy to
/// use implementing [`ExecutionPlan::required_input_ordering`].
///
/// [`ExecutionPlan::required_input_ordering`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#method.required_input_ordering
pub fn from_lex_ordering(ordering: LexOrdering) -> Self {
Self::new(
ordering
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-expr/src/equivalence/class.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,11 +476,11 @@ impl EquivalenceGroup {
/// sort expressions.
pub fn normalize_sort_exprs(&self, sort_exprs: &LexOrdering) -> LexOrdering {
// Convert sort expressions to sort requirements:
let sort_reqs = PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter());
let sort_reqs = LexRequirement::from(sort_exprs.clone());
// Normalize the requirements:
let normalized_sort_reqs = self.normalize_sort_requirements(&sort_reqs);
// Convert sort requirements back to sort expressions:
PhysicalSortRequirement::to_sort_exprs(normalized_sort_reqs.inner)
LexOrdering::from(normalized_sort_reqs)
}

/// This function applies the `normalize_sort_requirement` function for all
Expand Down
12 changes: 6 additions & 6 deletions datafusion/physical-expr/src/equivalence/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,11 +409,11 @@ impl EquivalenceProperties {
/// after deduplication.
fn normalize_sort_exprs(&self, sort_exprs: &LexOrdering) -> LexOrdering {
// Convert sort expressions to sort requirements:
let sort_reqs = PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter());
let sort_reqs = LexRequirement::from(sort_exprs.clone());
// Normalize the requirements:
let normalized_sort_reqs = self.normalize_sort_requirements(&sort_reqs);
// Convert sort requirements back to sort expressions:
PhysicalSortRequirement::to_sort_exprs(normalized_sort_reqs)
LexOrdering::from(normalized_sort_reqs)
}

/// Normalizes the given sort requirements (i.e. `sort_reqs`) using the
Expand Down Expand Up @@ -454,7 +454,7 @@ impl EquivalenceProperties {
/// orderings.
pub fn ordering_satisfy(&self, given: &LexOrdering) -> bool {
// Convert the given sort expressions to sort requirements:
let sort_requirements = PhysicalSortRequirement::from_sort_exprs(given.iter());
let sort_requirements = LexRequirement::from(given.clone());
self.ordering_satisfy_requirement(&sort_requirements)
}

Expand Down Expand Up @@ -548,11 +548,11 @@ impl EquivalenceProperties {
rhs: &LexOrdering,
) -> Option<LexOrdering> {
// Convert the given sort expressions to sort requirements:
let lhs = PhysicalSortRequirement::from_sort_exprs(lhs);
let rhs = PhysicalSortRequirement::from_sort_exprs(rhs);
let lhs = LexRequirement::from(lhs.clone());
let rhs = LexRequirement::from(rhs.clone());
let finer = self.get_finer_requirement(&lhs, &rhs);
// Convert the chosen sort requirements back to sort expressions:
finer.map(PhysicalSortRequirement::to_sort_exprs)
finer.map(LexOrdering::from)
}

/// Returns the finer ordering among the requirements `lhs` and `rhs`,
Expand Down
6 changes: 3 additions & 3 deletions datafusion/physical-optimizer/src/output_requirements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use datafusion_physical_plan::{
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{Result, Statistics};
use datafusion_physical_expr::{Distribution, LexRequirement, PhysicalSortRequirement};
use datafusion_physical_expr::{Distribution, LexRequirement};
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion_physical_plan::{ExecutionPlanProperties, PlanProperties};

Expand Down Expand Up @@ -256,13 +256,13 @@ fn require_top_ordering_helper(
// Therefore; we check the sort expression field of the SortExec to assign the requirements.
let req_ordering = sort_exec.expr();
let req_dist = sort_exec.required_input_distribution()[0].clone();
let reqs = PhysicalSortRequirement::from_sort_exprs(req_ordering);
let reqs = LexRequirement::from(req_ordering.clone());
Ok((
Arc::new(OutputRequirementExec::new(plan, Some(reqs), req_dist)) as _,
true,
))
} else if let Some(spm) = plan.as_any().downcast_ref::<SortPreservingMergeExec>() {
let reqs = PhysicalSortRequirement::from_sort_exprs(spm.expr());
let reqs = LexRequirement::from(spm.expr().clone());
Ok((
Arc::new(OutputRequirementExec::new(
plan,
Expand Down
6 changes: 2 additions & 4 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1074,9 +1074,7 @@ pub fn get_finer_aggregate_exprs_requirement(
);
}

Ok(PhysicalSortRequirement::from_sort_exprs(
requirement.inner.iter(),
))
Ok(LexRequirement::from(requirement))
}

/// Returns physical expressions for arguments to evaluate against a batch.
Expand Down Expand Up @@ -2304,7 +2302,7 @@ mod tests {
&eq_properties,
&AggregateMode::Partial,
)?;
let res = PhysicalSortRequirement::to_sort_exprs(res);
let res = LexOrdering::from(res);
assert_eq!(res, common_requirement);
Ok(())
}
Expand Down
10 changes: 3 additions & 7 deletions datafusion/physical-plan/src/joins/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::equivalence::join_equivalence_properties;
use datafusion_physical_expr::{PhysicalExprRef, PhysicalSortRequirement};
use datafusion_physical_expr::PhysicalExprRef;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
use futures::{Stream, StreamExt};

Expand Down Expand Up @@ -297,12 +297,8 @@ impl ExecutionPlan for SortMergeJoinExec {

fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
vec![
Some(PhysicalSortRequirement::from_sort_exprs(
self.left_sort_exprs.iter(),
)),
Some(PhysicalSortRequirement::from_sort_exprs(
self.right_sort_exprs.iter(),
)),
Some(LexRequirement::from(self.left_sort_exprs.clone())),
Some(LexRequirement::from(self.right_sort_exprs.clone())),
]
}

Expand Down
Loading