diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 6863978610db..4bbb995c365e 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1423,7 +1423,6 @@ pub(crate) mod tests { use datafusion_physical_expr::expressions::{BinaryExpr, Literal}; use datafusion_physical_expr::{ expressions::binary, expressions::lit, LexOrdering, PhysicalSortExpr, - PhysicalSortRequirement, }; use datafusion_physical_expr_common::sort_expr::LexRequirement; use datafusion_physical_plan::PlanProperties; @@ -1496,9 +1495,7 @@ pub(crate) mod tests { if self.expr.is_empty() { vec![None] } else { - vec![Some(PhysicalSortRequirement::from_sort_exprs( - self.expr.iter(), - ))] + vec![Some(LexRequirement::from(self.expr.clone()))] } } diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index adc3d7cac10c..636d52ccc9cd 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -61,7 +61,7 @@ use crate::physical_plan::{Distribution, ExecutionPlan, InputOrderMode}; use datafusion_common::plan_err; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_physical_expr::{Partitioning, PhysicalSortRequirement}; +use datafusion_physical_expr::Partitioning; use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; @@ -221,7 +221,7 @@ fn replace_with_partial_sort( // here we're trying to find the common prefix for sorted columns that is required for the // sort and already satisfied by the given ordering let child_eq_properties = child.equivalence_properties(); - let sort_req = PhysicalSortRequirement::from_sort_exprs(sort_plan.expr()); + let sort_req = LexRequirement::from(sort_plan.expr().clone()); let mut common_prefix_length = 0; while child_eq_properties.ordering_satisfy_requirement(&LexRequirement { @@ -275,8 +275,8 @@ fn parallelize_sorts( { // Take the initial sort expressions and requirements let (sort_exprs, fetch) = get_sort_exprs(&requirements.plan)?; - let sort_reqs = PhysicalSortRequirement::from_sort_exprs(sort_exprs); - let sort_exprs = LexOrdering::new(sort_exprs.to_vec()); + let sort_reqs = LexRequirement::from(sort_exprs.clone()); + let sort_exprs = sort_exprs.clone(); // If there is a connection between a `CoalescePartitionsExec` and a // global sort that satisfy the requirements (i.e. intermediate diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs b/datafusion/core/src/physical_optimizer/sort_pushdown.rs index e231e719b25f..42d682169da8 100644 --- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs @@ -94,7 +94,8 @@ fn pushdown_sorts_helper( if is_sort(plan) { let required_ordering = plan .output_ordering() - .map(PhysicalSortRequirement::from_sort_exprs) + .cloned() + .map(LexRequirement::from) .unwrap_or_default(); if !satisfy_parent { // Make sure this `SortExec` satisfies parent requirements: @@ -180,11 +181,12 @@ fn pushdown_requirement_to_children( RequirementsCompatibility::NonCompatible => Ok(None), } } else if let Some(sort_exec) = plan.as_any().downcast_ref::() { - let sort_req = PhysicalSortRequirement::from_sort_exprs( + let sort_req = LexRequirement::from( sort_exec .properties() .output_ordering() - .unwrap_or(&LexOrdering::default()), + .cloned() + .unwrap_or(LexOrdering::default()), ); if sort_exec .properties() @@ -205,10 +207,11 @@ fn pushdown_requirement_to_children( .iter() .all(|maintain| *maintain) { - let output_req = PhysicalSortRequirement::from_sort_exprs( + let output_req = LexRequirement::from( plan.properties() .output_ordering() - .unwrap_or(&LexOrdering::default()), + .cloned() + .unwrap_or(LexOrdering::default()), ); // Push down through operator with fetch when: // - requirement is aligned with output ordering @@ -227,14 +230,12 @@ fn pushdown_requirement_to_children( } else if is_union(plan) { // UnionExec does not have real sort requirements for its input. Here we change the adjusted_request_ordering to UnionExec's output ordering and // propagate the sort requirements down to correct the unnecessary descendant SortExec under the UnionExec - let req = (!parent_required.is_empty()) - .then(|| LexRequirement::new(parent_required.to_vec())); + let req = (!parent_required.is_empty()).then(|| parent_required.clone()); Ok(Some(vec![req; plan.children().len()])) } else if let Some(smj) = plan.as_any().downcast_ref::() { // If the current plan is SortMergeJoinExec let left_columns_len = smj.left().schema().fields().len(); - let parent_required_expr = - PhysicalSortRequirement::to_sort_exprs(parent_required.iter().cloned()); + let parent_required_expr = LexOrdering::from(parent_required.clone()); match expr_source_side( parent_required_expr.as_ref(), smj.join_type(), @@ -251,8 +252,7 @@ fn pushdown_requirement_to_children( smj.schema().fields.len() - smj.right().schema().fields.len(); let new_right_required = shift_right_required(parent_required, right_offset)?; - let new_right_required_expr = - PhysicalSortRequirement::to_sort_exprs(new_right_required); + let new_right_required_expr = LexOrdering::from(new_right_required); try_pushdown_requirements_to_join( smj, parent_required, @@ -278,8 +278,7 @@ fn pushdown_requirement_to_children( // Pushing down is not beneficial Ok(None) } else if is_sort_preserving_merge(plan) { - let new_ordering = - PhysicalSortRequirement::to_sort_exprs(parent_required.to_vec()); + let new_ordering = LexOrdering::from(parent_required.clone()); let mut spm_eqs = plan.equivalence_properties().clone(); // Sort preserving merge will have new ordering, one requirement above is pushed down to its below. spm_eqs = spm_eqs.with_reorder(new_ordering); @@ -412,7 +411,7 @@ fn try_pushdown_requirements_to_join( let should_pushdown = smj_eqs.ordering_satisfy_requirement(parent_required); Ok(should_pushdown.then(|| { let mut required_input_ordering = smj.required_input_ordering(); - let new_req = Some(PhysicalSortRequirement::from_sort_exprs(sort_expr)); + let new_req = Some(LexRequirement::from(sort_expr.clone())); match push_side { JoinSide::Left => { required_input_ordering[0] = new_req; diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs b/datafusion/core/src/physical_optimizer/test_utils.rs index bdf16300ea87..88bb0b6fef23 100644 --- a/datafusion/core/src/physical_optimizer/test_utils.rs +++ b/datafusion/core/src/physical_optimizer/test_utils.rs @@ -56,9 +56,7 @@ use datafusion_physical_plan::{ use async_trait::async_trait; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; -use datafusion_physical_expr_common::sort_expr::{ - LexOrdering, LexRequirement, PhysicalSortRequirement, -}; +use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; async fn register_current_csv( ctx: &SessionContext, @@ -419,9 +417,7 @@ impl ExecutionPlan for RequirementsTestExec { } fn required_input_ordering(&self) -> Vec> { - let requirement = PhysicalSortRequirement::from_sort_exprs( - self.required_input_ordering.as_ref().iter(), - ); + let requirement = LexRequirement::from(self.required_input_ordering.clone()); vec![Some(requirement)] } diff --git a/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs b/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs index c9363b00e18f..d563d0c56d36 100644 --- a/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs +++ b/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs @@ -27,7 +27,7 @@ use datafusion_physical_expr::aggregate::AggregateFunctionExpr; use datafusion_physical_expr::{ reverse_order_bys, EquivalenceProperties, PhysicalSortRequirement, }; -use datafusion_physical_expr_common::sort_expr::LexRequirement; +use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::aggregates::concat_slices; use datafusion_physical_plan::windows::get_ordered_partition_by_indices; @@ -139,12 +139,10 @@ fn try_convert_aggregate_if_better( aggr_exprs .into_iter() .map(|aggr_expr| { - let aggr_sort_exprs = &aggr_expr.order_bys().cloned().unwrap_or_default(); + let aggr_sort_exprs = aggr_expr.order_bys().unwrap_or(LexOrdering::empty()); let reverse_aggr_sort_exprs = reverse_order_bys(aggr_sort_exprs); - let aggr_sort_reqs = - PhysicalSortRequirement::from_sort_exprs(aggr_sort_exprs.iter()); - let reverse_aggr_req = - PhysicalSortRequirement::from_sort_exprs(&reverse_aggr_sort_exprs.inner); + let aggr_sort_reqs = LexRequirement::from(aggr_sort_exprs.clone()); + let reverse_aggr_req = LexRequirement::from(reverse_aggr_sort_exprs); // If the aggregate expression benefits from input ordering, and // there is an actual ordering enabling this, try to update the diff --git a/datafusion/core/src/physical_optimizer/utils.rs b/datafusion/core/src/physical_optimizer/utils.rs index 8007d8cc7f00..9acd3f67c272 100644 --- a/datafusion/core/src/physical_optimizer/utils.rs +++ b/datafusion/core/src/physical_optimizer/utils.rs @@ -27,7 +27,8 @@ use crate::physical_plan::union::UnionExec; use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec}; use crate::physical_plan::{ExecutionPlan, ExecutionPlanProperties}; -use datafusion_physical_expr::{LexRequirement, PhysicalSortRequirement}; +use datafusion_physical_expr::LexRequirement; +use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion_physical_plan::tree_node::PlanContext; @@ -38,7 +39,7 @@ pub fn add_sort_above( sort_requirements: LexRequirement, fetch: Option, ) -> PlanContext { - let mut sort_expr = PhysicalSortRequirement::to_sort_exprs(sort_requirements); + let mut sort_expr = LexOrdering::from(sort_requirements); sort_expr.inner.retain(|sort_expr| { !node .plan diff --git a/datafusion/physical-expr-common/src/sort_expr.rs b/datafusion/physical-expr-common/src/sort_expr.rs index f91d583215b3..1f7946af1628 100644 --- a/datafusion/physical-expr-common/src/sort_expr.rs +++ b/datafusion/physical-expr-common/src/sort_expr.rs @@ -300,26 +300,14 @@ impl PhysicalSortRequirement { }) } - /// Returns [`PhysicalSortRequirement`] that requires the exact - /// sort of the [`PhysicalSortExpr`]s in `ordering` - /// - /// This method takes `&'a PhysicalSortExpr` to make it easy to - /// use implementing [`ExecutionPlan::required_input_ordering`]. - /// - /// [`ExecutionPlan::required_input_ordering`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#method.required_input_ordering + #[deprecated(since = "43.0.0", note = "use LexRequirement::from_lex_ordering")] pub fn from_sort_exprs<'a>( ordering: impl IntoIterator, ) -> LexRequirement { let ordering = ordering.into_iter().cloned().collect(); LexRequirement::from_lex_ordering(ordering) } - - /// Converts an iterator of [`PhysicalSortRequirement`] into a Vec - /// of [`PhysicalSortExpr`]s. - /// - /// This function converts `PhysicalSortRequirement` to `PhysicalSortExpr` - /// for each entry in the input. If required ordering is None for an entry - /// default ordering `ASC, NULLS LAST` if given (see the `PhysicalSortExpr::from`). + #[deprecated(since = "43.0.0", note = "use LexOrdering::from_lex_requirement")] pub fn to_sort_exprs( requirements: impl IntoIterator, ) -> LexOrdering { @@ -420,8 +408,8 @@ impl LexOrdering { /// This function converts `PhysicalSortRequirement` to `PhysicalSortExpr` /// for each entry in the input. If required ordering is None for an entry /// default ordering `ASC, NULLS LAST` if given (see the `PhysicalSortExpr::from`). - pub fn from_lex_requirement(requirements: LexRequirement) -> LexOrdering { - requirements + pub fn from_lex_requirement(requirement: LexRequirement) -> LexOrdering { + requirement .into_iter() .map(PhysicalSortExpr::from) .collect() @@ -545,15 +533,10 @@ impl LexRequirement { self.inner.push(physical_sort_requirement) } - /// Create a new [`LexRequirement`] from a vector of [`PhysicalSortExpr`]s. + /// Create a new [`LexRequirement`] from a [`LexOrdering`] /// - /// Returns [`PhysicalSortRequirement`] that requires the exact + /// Returns [`LexRequirement`] that requires the exact /// sort of the [`PhysicalSortExpr`]s in `ordering` - /// - /// This method takes `&'a PhysicalSortExpr` to make it easy to - /// use implementing [`ExecutionPlan::required_input_ordering`]. - /// - /// [`ExecutionPlan::required_input_ordering`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#method.required_input_ordering pub fn from_lex_ordering(ordering: LexOrdering) -> Self { Self::new( ordering diff --git a/datafusion/physical-expr/src/equivalence/class.rs b/datafusion/physical-expr/src/equivalence/class.rs index 35ff6f685b53..83811fe3be7d 100644 --- a/datafusion/physical-expr/src/equivalence/class.rs +++ b/datafusion/physical-expr/src/equivalence/class.rs @@ -476,11 +476,11 @@ impl EquivalenceGroup { /// sort expressions. pub fn normalize_sort_exprs(&self, sort_exprs: &LexOrdering) -> LexOrdering { // Convert sort expressions to sort requirements: - let sort_reqs = PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter()); + let sort_reqs = LexRequirement::from(sort_exprs.clone()); // Normalize the requirements: let normalized_sort_reqs = self.normalize_sort_requirements(&sort_reqs); // Convert sort requirements back to sort expressions: - PhysicalSortRequirement::to_sort_exprs(normalized_sort_reqs.inner) + LexOrdering::from(normalized_sort_reqs) } /// This function applies the `normalize_sort_requirement` function for all diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index 061e77222f29..06f1e24ed202 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -409,11 +409,11 @@ impl EquivalenceProperties { /// after deduplication. fn normalize_sort_exprs(&self, sort_exprs: &LexOrdering) -> LexOrdering { // Convert sort expressions to sort requirements: - let sort_reqs = PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter()); + let sort_reqs = LexRequirement::from(sort_exprs.clone()); // Normalize the requirements: let normalized_sort_reqs = self.normalize_sort_requirements(&sort_reqs); // Convert sort requirements back to sort expressions: - PhysicalSortRequirement::to_sort_exprs(normalized_sort_reqs) + LexOrdering::from(normalized_sort_reqs) } /// Normalizes the given sort requirements (i.e. `sort_reqs`) using the @@ -454,7 +454,7 @@ impl EquivalenceProperties { /// orderings. pub fn ordering_satisfy(&self, given: &LexOrdering) -> bool { // Convert the given sort expressions to sort requirements: - let sort_requirements = PhysicalSortRequirement::from_sort_exprs(given.iter()); + let sort_requirements = LexRequirement::from(given.clone()); self.ordering_satisfy_requirement(&sort_requirements) } @@ -548,11 +548,11 @@ impl EquivalenceProperties { rhs: &LexOrdering, ) -> Option { // Convert the given sort expressions to sort requirements: - let lhs = PhysicalSortRequirement::from_sort_exprs(lhs); - let rhs = PhysicalSortRequirement::from_sort_exprs(rhs); + let lhs = LexRequirement::from(lhs.clone()); + let rhs = LexRequirement::from(rhs.clone()); let finer = self.get_finer_requirement(&lhs, &rhs); // Convert the chosen sort requirements back to sort expressions: - finer.map(PhysicalSortRequirement::to_sort_exprs) + finer.map(LexOrdering::from) } /// Returns the finer ordering among the requirements `lhs` and `rhs`, diff --git a/datafusion/physical-optimizer/src/output_requirements.rs b/datafusion/physical-optimizer/src/output_requirements.rs index 4f6f91a2348f..6c8e76bff82b 100644 --- a/datafusion/physical-optimizer/src/output_requirements.rs +++ b/datafusion/physical-optimizer/src/output_requirements.rs @@ -33,7 +33,7 @@ use datafusion_physical_plan::{ use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{Result, Statistics}; -use datafusion_physical_expr::{Distribution, LexRequirement, PhysicalSortRequirement}; +use datafusion_physical_expr::{Distribution, LexRequirement}; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use datafusion_physical_plan::{ExecutionPlanProperties, PlanProperties}; @@ -256,13 +256,13 @@ fn require_top_ordering_helper( // Therefore; we check the sort expression field of the SortExec to assign the requirements. let req_ordering = sort_exec.expr(); let req_dist = sort_exec.required_input_distribution()[0].clone(); - let reqs = PhysicalSortRequirement::from_sort_exprs(req_ordering); + let reqs = LexRequirement::from(req_ordering.clone()); Ok(( Arc::new(OutputRequirementExec::new(plan, Some(reqs), req_dist)) as _, true, )) } else if let Some(spm) = plan.as_any().downcast_ref::() { - let reqs = PhysicalSortRequirement::from_sort_exprs(spm.expr()); + let reqs = LexRequirement::from(spm.expr().clone()); Ok(( Arc::new(OutputRequirementExec::new( plan, diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index a71039b5733b..2220007fdd72 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -1074,9 +1074,7 @@ pub fn get_finer_aggregate_exprs_requirement( ); } - Ok(PhysicalSortRequirement::from_sort_exprs( - requirement.inner.iter(), - )) + Ok(LexRequirement::from(requirement)) } /// Returns physical expressions for arguments to evaluate against a batch. @@ -2304,7 +2302,7 @@ mod tests { &eq_properties, &AggregateMode::Partial, )?; - let res = PhysicalSortRequirement::to_sort_exprs(res); + let res = LexOrdering::from(res); assert_eq!(res, common_requirement); Ok(()) } diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index 2f6dc5fa0bc1..1eb6ea632923 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -51,7 +51,7 @@ use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_execution::TaskContext; use datafusion_physical_expr::equivalence::join_equivalence_properties; -use datafusion_physical_expr::{PhysicalExprRef, PhysicalSortRequirement}; +use datafusion_physical_expr::PhysicalExprRef; use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; use futures::{Stream, StreamExt}; @@ -297,12 +297,8 @@ impl ExecutionPlan for SortMergeJoinExec { fn required_input_ordering(&self) -> Vec> { vec![ - Some(PhysicalSortRequirement::from_sort_exprs( - self.left_sort_exprs.iter(), - )), - Some(PhysicalSortRequirement::from_sort_exprs( - self.right_sort_exprs.iter(), - )), + Some(LexRequirement::from(self.left_sort_exprs.clone())), + Some(LexRequirement::from(self.right_sort_exprs.clone())), ] } diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index 84c9f03b07be..94ef4d5bc34c 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -70,7 +70,7 @@ use datafusion_execution::TaskContext; use datafusion_expr::interval_arithmetic::Interval; use datafusion_physical_expr::equivalence::join_equivalence_properties; use datafusion_physical_expr::intervals::cp_solver::ExprIntervalGraph; -use datafusion_physical_expr::{PhysicalExprRef, PhysicalSortRequirement}; +use datafusion_physical_expr::PhysicalExprRef; use ahash::RandomState; use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; @@ -415,12 +415,12 @@ impl ExecutionPlan for SymmetricHashJoinExec { vec![ self.left_sort_exprs .as_ref() - .map(LexOrdering::iter) - .map(PhysicalSortRequirement::from_sort_exprs), + .cloned() + .map(LexRequirement::from), self.right_sort_exprs .as_ref() - .map(LexOrdering::iter) - .map(PhysicalSortRequirement::from_sort_exprs), + .cloned() + .map(LexRequirement::from), ] } diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 5c80322afe0c..9a89db9a5893 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -32,7 +32,6 @@ use crate::{ use datafusion_common::{internal_err, Result}; use datafusion_execution::memory_pool::MemoryConsumer; use datafusion_execution::TaskContext; -use datafusion_physical_expr::PhysicalSortRequirement; use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; use log::{debug, trace}; @@ -203,9 +202,7 @@ impl ExecutionPlan for SortPreservingMergeExec { } fn required_input_ordering(&self) -> Vec> { - vec![Some(PhysicalSortRequirement::from_sort_exprs( - self.expr.iter(), - ))] + vec![Some(LexRequirement::from(self.expr.clone()))] } fn maintains_input_order(&self) -> Vec { diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index da7f6d79e578..24aba011d06c 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -516,9 +516,8 @@ pub fn get_window_mode( // Treat partition by exprs as constant. During analysis of requirements are satisfied. let const_exprs = partitionby_exprs.iter().map(ConstExpr::from); let partition_by_eqs = input_eqs.with_constants(const_exprs); - let order_by_reqs = PhysicalSortRequirement::from_sort_exprs(orderby_keys.iter()); - let reverse_order_by_reqs = - PhysicalSortRequirement::from_sort_exprs(reverse_order_bys(orderby_keys).iter()); + let order_by_reqs = LexRequirement::from(orderby_keys.clone()); + let reverse_order_by_reqs = LexRequirement::from(reverse_order_bys(orderby_keys)); for (should_swap, order_by_reqs) in [(false, order_by_reqs), (true, reverse_order_by_reqs)] { diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index e84eae2b9082..64e462d1695f 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -35,7 +35,7 @@ use datafusion::datasource::physical_plan::{AvroExec, CsvExec}; use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::execution::FunctionRegistry; use datafusion::physical_expr::aggregate::AggregateFunctionExpr; -use datafusion::physical_expr::{LexOrdering, PhysicalExprRef, PhysicalSortRequirement}; +use datafusion::physical_expr::{LexOrdering, LexRequirement, PhysicalExprRef}; use datafusion::physical_plan::aggregates::AggregateMode; use datafusion::physical_plan::aggregates::{AggregateExec, PhysicalGroupBy}; use datafusion::physical_plan::analyze::AnalyzeExec; @@ -1037,7 +1037,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { &sink_schema, extension_codec, ) - .map(|item| PhysicalSortRequirement::from_sort_exprs(&item.inner)) + .map(LexRequirement::from) }) .transpose()?; Ok(Arc::new(DataSinkExec::new( @@ -1067,7 +1067,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { &sink_schema, extension_codec, ) - .map(|item| PhysicalSortRequirement::from_sort_exprs(&item.inner)) + .map(LexRequirement::from) }) .transpose()?; Ok(Arc::new(DataSinkExec::new( @@ -1104,9 +1104,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { &sink_schema, extension_codec, ) - .map(|item| { - PhysicalSortRequirement::from_sort_exprs(&item.inner) - }) + .map(LexRequirement::from) }) .transpose()?; Ok(Arc::new(DataSinkExec::new(