From a1d5a228eb054d9679239f235eab1d236f3820f1 Mon Sep 17 00:00:00 2001 From: Tobias Pitters <31857876+CloseChoice@users.noreply.github.com> Date: Wed, 12 Jul 2023 09:10:45 +0200 Subject: [PATCH] feat(rust, python): add `maintain_order` argument to `sort`/`top_k`/`bottom_k` (#9672) --- polars/polars-algo/src/algo.rs | 2 +- .../logical/categorical/ops/unique.rs | 2 +- .../polars-core/src/chunked_array/ops/mod.rs | 2 ++ .../src/chunked_array/ops/sort/categorical.rs | 5 +-- .../src/chunked_array/ops/sort/mod.rs | 17 ++++++--- polars/polars-core/src/frame/groupby/mod.rs | 2 +- .../src/frame/hash_join/sort_merge.rs | 3 ++ polars/polars-core/src/frame/mod.rs | 17 +++++---- polars/polars-core/src/frame/top_k.rs | 13 +++++-- .../src/series/implementations/struct_.rs | 1 + .../src/executors/sinks/sort/sink.rs | 10 +++++- .../src/executors/sinks/sort/sink_multiple.rs | 1 + .../polars-plan/src/logical_plan/builder.rs | 9 ++++- .../polars-plan/src/logical_plan/options.rs | 1 + polars/polars-lazy/src/frame/mod.rs | 19 +++++++--- .../src/physical_plan/executors/sort.rs | 1 + .../src/physical_plan/node_timer.rs | 7 ++-- .../src/physical_plan/streaming/checks.rs | 15 +++++--- polars/polars-lazy/src/tests/aggregations.rs | 5 +++ polars/polars-lazy/src/tests/queries.rs | 28 +++++++++++++-- polars/polars-lazy/src/tests/streaming.rs | 29 +++++++++++++-- polars/polars-lazy/src/tests/tpch.rs | 1 + polars/polars-ops/src/series/ops/various.rs | 2 +- polars/polars-sql/src/context.rs | 2 +- polars/polars-sql/tests/iss_7437.rs | 4 +-- polars/polars-sql/tests/ops_distinct_on.rs | 1 + polars/polars-sql/tests/simple_exprs.rs | 1 + polars/src/docs/lazy.rs | 2 +- polars/tests/it/lazy/expressions/window.rs | 2 +- py-polars/polars/dataframe/frame.py | 26 ++++++++++++-- py-polars/polars/lazyframe/frame.py | 31 +++++++++++++--- py-polars/src/expr/general.rs | 2 ++ py-polars/src/lazyframe.rs | 36 +++++++++++++++---- py-polars/src/series/mod.rs | 1 + py-polars/tests/unit/test_df.py | 18 ++++++++++ 35 files changed, 265 insertions(+), 53 deletions(-) diff --git a/polars/polars-algo/src/algo.rs b/polars/polars-algo/src/algo.rs index 669bdd6c995c..2da0f1787521 100644 --- a/polars/polars-algo/src/algo.rs +++ b/polars/polars-algo/src/algo.rs @@ -97,5 +97,5 @@ pub fn hist(s: &Series, bins: Option<&Series>, bin_count: Option) -> Resu cuts.left_join(&out, [category_str], [category_str])? .fill_null(FillNullStrategy::Zero)? - .sort(["category"], false) + .sort(["category"], false, false) } diff --git a/polars/polars-core/src/chunked_array/logical/categorical/ops/unique.rs b/polars/polars-core/src/chunked_array/logical/categorical/ops/unique.rs index f80a40f6e6df..1f843bee77d3 100644 --- a/polars/polars-core/src/chunked_array/logical/categorical/ops/unique.rs +++ b/polars/polars-core/src/chunked_array/logical/categorical/ops/unique.rs @@ -61,6 +61,6 @@ impl CategoricalChunked { counts.rename("counts"); let cols = vec![values.into_series(), counts.into_series()]; let df = DataFrame::new_no_checks(cols); - df.sort(["counts"], true) + df.sort(["counts"], true, false) } } diff --git a/polars/polars-core/src/chunked_array/ops/mod.rs b/polars/polars-core/src/chunked_array/ops/mod.rs index 91ccac838096..eb3ecd2a8590 100644 --- a/polars/polars-core/src/chunked_array/ops/mod.rs +++ b/polars/polars-core/src/chunked_array/ops/mod.rs @@ -479,6 +479,7 @@ pub struct SortOptions { pub descending: bool, pub nulls_last: bool, pub multithreaded: bool, + pub maintain_order: bool, } #[derive(Clone)] @@ -495,6 +496,7 @@ impl Default for SortOptions { descending: false, nulls_last: false, multithreaded: true, + maintain_order: false, } } } diff --git a/polars/polars-core/src/chunked_array/ops/sort/categorical.rs b/polars/polars-core/src/chunked_array/ops/sort/categorical.rs index 0e8aa031e29d..7c8e106f4059 100644 --- a/polars/polars-core/src/chunked_array/ops/sort/categorical.rs +++ b/polars/polars-core/src/chunked_array/ops/sort/categorical.rs @@ -99,6 +99,7 @@ impl CategoricalChunked { nulls_last: false, descending, multithreaded: true, + maintain_order: false, }) } @@ -202,12 +203,12 @@ mod test { "vals" => [1, 1, 2, 2] ]?; - let out = df.sort(["cat", "vals"], vec![false, false])?; + let out = df.sort(["cat", "vals"], vec![false, false], false)?; let out = out.column("cat")?; let cat = out.categorical()?; assert_order(cat, &["a", "a", "b", "c"]); - let out = df.sort(["vals", "cat"], vec![false, false])?; + let out = df.sort(["vals", "cat"], vec![false, false], false)?; let out = out.column("cat")?; let cat = out.categorical()?; assert_order(cat, &["b", "c", "a", "a"]); diff --git a/polars/polars-core/src/chunked_array/ops/sort/mod.rs b/polars/polars-core/src/chunked_array/ops/sort/mod.rs index 6042b4fa0e96..36a7a80e2f8d 100644 --- a/polars/polars-core/src/chunked_array/ops/sort/mod.rs +++ b/polars/polars-core/src/chunked_array/ops/sort/mod.rs @@ -426,6 +426,7 @@ impl ChunkSort for Utf8Chunked { descending, nulls_last: false, multithreaded: true, + maintain_order: false, }) } @@ -545,6 +546,7 @@ impl ChunkSort for BinaryChunked { descending, nulls_last: false, multithreaded: true, + maintain_order: false, }) } @@ -637,6 +639,7 @@ impl ChunkSort for BooleanChunked { descending, nulls_last: false, multithreaded: true, + maintain_order: false, }) } @@ -775,6 +778,7 @@ mod test { descending: false, nulls_last: false, multithreaded: true, + maintain_order: false, }); assert_eq!( Vec::from(&out), @@ -793,6 +797,7 @@ mod test { descending: false, nulls_last: true, multithreaded: true, + maintain_order: false, }); assert_eq!( Vec::from(&out), @@ -817,7 +822,7 @@ mod test { let c = Utf8Chunked::new("c", &["a", "b", "c", "d", "e", "f", "g", "h"]); let df = DataFrame::new(vec![a.into_series(), b.into_series(), c.into_series()])?; - let out = df.sort(["a", "b", "c"], false)?; + let out = df.sort(["a", "b", "c"], false, false)?; assert_eq!( Vec::from(out.column("b")?.i64()?), &[ @@ -837,7 +842,7 @@ mod test { let b = Int32Chunked::new("b", &[5, 4, 2, 3, 4, 5]).into_series(); let df = DataFrame::new(vec![a, b])?; - let out = df.sort(["a", "b"], false)?; + let out = df.sort(["a", "b"], false, false)?; let expected = df!( "a" => ["a", "a", "b", "b", "c", "c"], "b" => [3, 5, 4, 4, 2, 5] @@ -849,14 +854,14 @@ mod test { "values" => ["a", "a", "b"] )?; - let out = df.sort(["groups", "values"], vec![true, false])?; + let out = df.sort(["groups", "values"], vec![true, false], false)?; let expected = df!( "groups" => [3, 2, 1], "values" => ["b", "a", "a"] )?; assert!(out.frame_equal(&expected)); - let out = df.sort(["values", "groups"], vec![false, true])?; + let out = df.sort(["values", "groups"], vec![false, true], false)?; let expected = df!( "groups" => [2, 1, 3], "values" => ["a", "a", "b"] @@ -873,6 +878,7 @@ mod test { descending: false, nulls_last: false, multithreaded: true, + maintain_order: false, }); let expected = &[None, None, Some("a"), Some("b"), Some("c")]; assert_eq!(Vec::from(&out), expected); @@ -881,6 +887,7 @@ mod test { descending: true, nulls_last: false, multithreaded: true, + maintain_order: false, }); let expected = &[None, None, Some("c"), Some("b"), Some("a")]; @@ -890,6 +897,7 @@ mod test { descending: false, nulls_last: true, multithreaded: true, + maintain_order: false, }); let expected = &[Some("a"), Some("b"), Some("c"), None, None]; assert_eq!(Vec::from(&out), expected); @@ -898,6 +906,7 @@ mod test { descending: true, nulls_last: true, multithreaded: true, + maintain_order: false, }); let expected = &[Some("c"), Some("b"), Some("a"), None, None]; assert_eq!(Vec::from(&out), expected); diff --git a/polars/polars-core/src/frame/groupby/mod.rs b/polars/polars-core/src/frame/groupby/mod.rs index 0e11028aed90..47a71cc7b99a 100644 --- a/polars/polars-core/src/frame/groupby/mod.rs +++ b/polars/polars-core/src/frame/groupby/mod.rs @@ -1080,7 +1080,7 @@ mod test { // Use of deprecated `sum()` for testing purposes #[allow(deprecated)] let res = df.groupby(["flt"]).unwrap().sum().unwrap(); - let res = res.sort(["flt"], false).unwrap(); + let res = res.sort(["flt"], false, false).unwrap(); assert_eq!( Vec::from(res.column("val_sum").unwrap().i32().unwrap()), &[Some(2), Some(2), Some(1)] diff --git a/polars/polars-core/src/frame/hash_join/sort_merge.rs b/polars/polars-core/src/frame/hash_join/sort_merge.rs index f5d2a575722f..46a1cc7e48ce 100644 --- a/polars/polars-core/src/frame/hash_join/sort_merge.rs +++ b/polars/polars-core/src/frame/hash_join/sort_merge.rs @@ -224,6 +224,7 @@ pub fn _sort_or_hash_inner( descending: false, nulls_last: false, multithreaded: true, + maintain_order: false, }); let s_right = unsafe { s_right.take_unchecked(&sort_idx).unwrap() }; let ids = par_sorted_merge_inner_no_nulls(s_left, &s_right); @@ -250,6 +251,7 @@ pub fn _sort_or_hash_inner( descending: false, nulls_last: false, multithreaded: true, + maintain_order: false, }); let s_left = unsafe { s_left.take_unchecked(&sort_idx).unwrap() }; let ids = par_sorted_merge_inner_no_nulls(&s_left, s_right); @@ -318,6 +320,7 @@ pub(super) fn sort_or_hash_left( descending: false, nulls_last: false, multithreaded: true, + maintain_order: false, }); let s_right = unsafe { s_right.take_unchecked(&sort_idx).unwrap() }; diff --git a/polars/polars-core/src/frame/mod.rs b/polars/polars-core/src/frame/mod.rs index 0e57ab298dc2..146af7f8445c 100644 --- a/polars/polars-core/src/frame/mod.rs +++ b/polars/polars-core/src/frame/mod.rs @@ -1841,11 +1841,12 @@ impl DataFrame { &mut self, by_column: impl IntoVec, descending: impl IntoVec, + maintain_order: bool, ) -> PolarsResult<&mut Self> { let by_column = self.select_series(by_column)?; let descending = descending.into_vec(); self.columns = self - .sort_impl(by_column, descending, false, None, true)? + .sort_impl(by_column, descending, false, maintain_order, None, true)? .columns; Ok(self) } @@ -1856,6 +1857,7 @@ impl DataFrame { by_column: Vec, descending: Vec, nulls_last: bool, + maintain_order: bool, slice: Option<(i64, usize)>, parallel: bool, ) -> PolarsResult { @@ -1890,7 +1892,7 @@ impl DataFrame { } if let Some((0, k)) = slice { - return self.top_k_impl(k, descending, by_column, nulls_last); + return self.top_k_impl(k, descending, by_column, nulls_last, maintain_order); } #[cfg(feature = "dtype-struct")] @@ -1912,6 +1914,7 @@ impl DataFrame { descending: descending[0], nulls_last, multithreaded: parallel, + maintain_order, }; // fast path for a frame with a single series // no need to compute the sort indices and then take by these indices @@ -1959,20 +1962,21 @@ impl DataFrame { /// ``` /// # use polars_core::prelude::*; /// fn sort_example(df: &DataFrame, descending: bool) -> PolarsResult { - /// df.sort(["a"], descending) + /// df.sort(["a"], descending, false) /// } /// /// fn sort_by_multiple_columns_example(df: &DataFrame) -> PolarsResult { - /// df.sort(&["a", "b"], vec![false, true]) + /// df.sort(&["a", "b"], vec![false, true], false) /// } /// ``` pub fn sort( &self, by_column: impl IntoVec, descending: impl IntoVec, + maintain_order: bool, ) -> PolarsResult { let mut df = self.clone(); - df.sort_in_place(by_column, descending)?; + df.sort_in_place(by_column, descending, maintain_order)?; Ok(df) } @@ -1986,6 +1990,7 @@ impl DataFrame { by_column, descending, options.nulls_last, + options.maintain_order, None, options.multithreaded, )? @@ -3603,7 +3608,7 @@ mod test { let df = df .unique_stable(None, UniqueKeepStrategy::First, None) .unwrap() - .sort(["flt"], false) + .sort(["flt"], false, false) .unwrap(); let valid = df! { "flt" => [1., 2., 3.], diff --git a/polars/polars-core/src/frame/top_k.rs b/polars/polars-core/src/frame/top_k.rs index cb932c39e132..fbcb8941f918 100644 --- a/polars/polars-core/src/frame/top_k.rs +++ b/polars/polars-core/src/frame/top_k.rs @@ -46,7 +46,7 @@ impl DataFrame { ) -> PolarsResult { let by_column = self.select_series(by_column)?; let descending = descending.into_vec(); - self.top_k_impl(k, descending, by_column, false) + self.top_k_impl(k, descending, by_column, false, false) } pub(crate) fn top_k_impl( @@ -55,6 +55,7 @@ impl DataFrame { mut descending: Vec, by_column: Vec, nulls_last: bool, + maintain_order: bool, ) -> PolarsResult { _broadcast_descending(by_column.len(), &mut descending); let encoded = _get_rows_encoded(&by_column, &descending, nulls_last)?; @@ -66,8 +67,16 @@ impl DataFrame { .collect::>(); let sorted = if k >= self.height() { - rows.sort_unstable(); + if maintain_order { + rows.sort(); + } else { + rows.sort_unstable(); + } &rows + } else if maintain_order { + // todo: maybe there is some more efficient method, comparable to select_nth_unstable + rows.sort(); + &rows[..k] } else { let (lower, _el, _upper) = rows.select_nth_unstable(k); lower.sort_unstable(); diff --git a/polars/polars-core/src/series/implementations/struct_.rs b/polars/polars-core/src/series/implementations/struct_.rs index 811bc3fc0f2f..87cedae6d3ec 100644 --- a/polars/polars-core/src/series/implementations/struct_.rs +++ b/polars/polars-core/src/series/implementations/struct_.rs @@ -367,6 +367,7 @@ impl SeriesTrait for SeriesWrap { df.columns.clone(), desc, options.nulls_last, + options.maintain_order, None, options.multithreaded, ) diff --git a/polars/polars-lazy/polars-pipe/src/executors/sinks/sort/sink.rs b/polars/polars-lazy/polars-pipe/src/executors/sinks/sort/sink.rs index a463baadceec..752e507517a8 100644 --- a/polars/polars-lazy/polars-pipe/src/executors/sinks/sort/sink.rs +++ b/polars/polars-lazy/polars-pipe/src/executors/sinks/sort/sink.rs @@ -175,6 +175,7 @@ impl Sink for SortSink { descending: self.sort_args.descending[0], nulls_last: self.sort_args.nulls_last, multithreaded: true, + maintain_order: self.sort_args.maintain_order, }); block_thread_until_io_thread_done(io_thread); @@ -216,5 +217,12 @@ pub(super) fn sort_accumulated( slice: Option<(i64, usize)>, ) -> PolarsResult { let sort_column = df.get_columns()[sort_idx].clone(); - df.sort_impl(vec![sort_column], vec![descending], false, slice, true) + df.sort_impl( + vec![sort_column], + vec![descending], + false, + false, + slice, + true, + ) } diff --git a/polars/polars-lazy/polars-pipe/src/executors/sinks/sort/sink_multiple.rs b/polars/polars-lazy/polars-pipe/src/executors/sinks/sort/sink_multiple.rs index 19133bb93ccd..18e6caa5f4d8 100644 --- a/polars/polars-lazy/polars-pipe/src/executors/sinks/sort/sink_multiple.rs +++ b/polars/polars-lazy/polars-pipe/src/executors/sinks/sort/sink_multiple.rs @@ -170,6 +170,7 @@ impl SortSinkMultiple { descending: vec![false], nulls_last: false, slice: sort_args.slice, + maintain_order: false, }, Arc::new(schema), )); diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/builder.rs b/polars/polars-lazy/polars-plan/src/logical_plan/builder.rs index a34c3c0a12c8..493baf93c92e 100644 --- a/polars/polars-lazy/polars-plan/src/logical_plan/builder.rs +++ b/polars/polars-lazy/polars-plan/src/logical_plan/builder.rs @@ -633,7 +633,13 @@ impl LogicalPlanBuilder { .into() } - pub fn sort(self, by_column: Vec, descending: Vec, null_last: bool) -> Self { + pub fn sort( + self, + by_column: Vec, + descending: Vec, + null_last: bool, + maintain_order: bool, + ) -> Self { let schema = try_delayed!(self.0.schema(), &self.0, into); let by_column = try_delayed!(rewrite_projections(by_column, &schema, &[]), &self.0, into); LogicalPlan::Sort { @@ -643,6 +649,7 @@ impl LogicalPlanBuilder { descending, nulls_last: null_last, slice: None, + maintain_order, }, } .into() diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/options.rs b/polars/polars-lazy/polars-plan/src/logical_plan/options.rs index 2f16a0a32b93..8bb27eccb702 100644 --- a/polars/polars-lazy/polars-plan/src/logical_plan/options.rs +++ b/polars/polars-lazy/polars-plan/src/logical_plan/options.rs @@ -254,6 +254,7 @@ pub struct SortArguments { pub descending: Vec, pub nulls_last: bool, pub slice: Option<(i64, usize)>, + pub maintain_order: bool, } #[derive(Clone, PartialEq, Eq, Debug, Default)] diff --git a/polars/polars-lazy/src/frame/mod.rs b/polars/polars-lazy/src/frame/mod.rs index 7762a08116e5..b7a305c3445b 100644 --- a/polars/polars-lazy/src/frame/mod.rs +++ b/polars/polars-lazy/src/frame/mod.rs @@ -220,11 +220,17 @@ impl LazyFrame { pub fn sort(self, by_column: &str, options: SortOptions) -> Self { let descending = options.descending; let nulls_last = options.nulls_last; + let maintain_order = options.maintain_order; let opt_state = self.get_opt_state(); let lp = self .get_plan_builder() - .sort(vec![col(by_column)], vec![descending], nulls_last) + .sort( + vec![col(by_column)], + vec![descending], + nulls_last, + maintain_order, + ) .build(); Self::from_logical_plan(lp, opt_state) } @@ -240,7 +246,7 @@ impl LazyFrame { /// /// Sort DataFrame by 'sepal.width' column /// fn example(df: DataFrame) -> LazyFrame { /// df.lazy() - /// .sort_by_exprs(vec![col("sepal.width")], vec![false], false) + /// .sort_by_exprs(vec![col("sepal.width")], vec![false], false, false) /// } /// ``` pub fn sort_by_exprs, B: AsRef<[bool]>>( @@ -248,6 +254,7 @@ impl LazyFrame { by_exprs: E, descending: B, nulls_last: bool, + maintain_order: bool, ) -> Self { let by_exprs = by_exprs.as_ref().to_vec(); let descending = descending.as_ref().to_vec(); @@ -257,7 +264,7 @@ impl LazyFrame { let opt_state = self.get_opt_state(); let lp = self .get_plan_builder() - .sort(by_exprs, descending, nulls_last) + .sort(by_exprs, descending, nulls_last, maintain_order) .build(); Self::from_logical_plan(lp, opt_state) } @@ -269,6 +276,7 @@ impl LazyFrame { by_exprs: E, descending: B, nulls_last: bool, + maintain_order: bool, ) -> Self { let mut descending = descending.as_ref().to_vec(); // top-k is reverse from sort @@ -276,7 +284,7 @@ impl LazyFrame { *v = !*v; } // this will optimize to top-k - self.sort_by_exprs(by_exprs, descending, nulls_last) + self.sort_by_exprs(by_exprs, descending, nulls_last, maintain_order) .slice(0, k) } @@ -286,10 +294,11 @@ impl LazyFrame { by_exprs: E, descending: B, nulls_last: bool, + maintain_order: bool, ) -> Self { let descending = descending.as_ref().to_vec(); // this will optimize to bottom-k - self.sort_by_exprs(by_exprs, descending, nulls_last) + self.sort_by_exprs(by_exprs, descending, nulls_last, maintain_order) .slice(0, k) } diff --git a/polars/polars-lazy/src/physical_plan/executors/sort.rs b/polars/polars-lazy/src/physical_plan/executors/sort.rs index 5c9e43f8b07d..2d34175d7212 100644 --- a/polars/polars-lazy/src/physical_plan/executors/sort.rs +++ b/polars/polars-lazy/src/physical_plan/executors/sort.rs @@ -35,6 +35,7 @@ impl SortExec { by_columns, std::mem::take(&mut self.args.descending), self.args.nulls_last, + self.args.maintain_order, self.args.slice, true, ) diff --git a/polars/polars-lazy/src/physical_plan/node_timer.rs b/polars/polars-lazy/src/physical_plan/node_timer.rs index 292f51b451a3..8be6861dda39 100644 --- a/polars/polars-lazy/src/physical_plan/node_timer.rs +++ b/polars/polars-lazy/src/physical_plan/node_timer.rs @@ -57,7 +57,10 @@ impl NodeTimer { let mut end = end.into_inner(); end.rename("end"); - DataFrame::new_no_checks(vec![nodes_s, start.into_series(), end.into_series()]) - .sort(vec!["start"], vec![false]) + DataFrame::new_no_checks(vec![nodes_s, start.into_series(), end.into_series()]).sort( + vec!["start"], + vec![false], + false, + ) } } diff --git a/polars/polars-lazy/src/physical_plan/streaming/checks.rs b/polars/polars-lazy/src/physical_plan/streaming/checks.rs index e85500acc5ac..70c00053ef02 100644 --- a/polars/polars-lazy/src/physical_plan/streaming/checks.rs +++ b/polars/polars-lazy/src/physical_plan/streaming/checks.rs @@ -2,10 +2,17 @@ use polars_core::prelude::{JoinArgs, JoinType}; use polars_plan::prelude::*; pub(super) fn is_streamable_sort(args: &SortArguments) -> bool { - // check if slice is positive - match args.slice { - Some((offset, _)) => offset >= 0, - None => true, + // check if slice is positive or maintain order is true + match args { + SortArguments { + maintain_order: true, + .. + } => false, + SortArguments { + slice: Some((offset, _)), + .. + } => *offset >= 0, + SortArguments { slice: None, .. } => true, } } diff --git a/polars/polars-lazy/src/tests/aggregations.rs b/polars/polars-lazy/src/tests/aggregations.rs index 6a0833920526..7c01ad136cb3 100644 --- a/polars/polars-lazy/src/tests/aggregations.rs +++ b/polars/polars-lazy/src/tests/aggregations.rs @@ -443,6 +443,7 @@ fn take_aggregations() -> PolarsResult<()> { descending: true, nulls_last: false, multithreaded: true, + maintain_order: false, }) .head(Some(2)), ) @@ -481,6 +482,7 @@ fn test_take_consistency() -> PolarsResult<()> { descending: true, nulls_last: false, multithreaded: true, + maintain_order: false, }) .take(lit(0))]) .collect()?; @@ -498,6 +500,7 @@ fn test_take_consistency() -> PolarsResult<()> { descending: true, nulls_last: false, multithreaded: true, + maintain_order: false, }) .take(lit(0))]) .collect()?; @@ -517,6 +520,7 @@ fn test_take_consistency() -> PolarsResult<()> { descending: true, nulls_last: false, multithreaded: true, + maintain_order: false, }) .take(lit(0)) .alias("1"), @@ -527,6 +531,7 @@ fn test_take_consistency() -> PolarsResult<()> { descending: true, nulls_last: false, multithreaded: true, + maintain_order: false, }) .take(lit(0)), ) diff --git a/polars/polars-lazy/src/tests/queries.rs b/polars/polars-lazy/src/tests/queries.rs index be269f3e6fe2..179feef17973 100644 --- a/polars/polars-lazy/src/tests/queries.rs +++ b/polars/polars-lazy/src/tests/queries.rs @@ -777,7 +777,7 @@ fn test_lazy_groupby_sort() { .agg([col("b").sort(false).first()]) .collect() .unwrap() - .sort(["a"], false) + .sort(["a"], false, false) .unwrap(); assert_eq!( @@ -791,7 +791,7 @@ fn test_lazy_groupby_sort() { .agg([col("b").sort(false).last()]) .collect() .unwrap() - .sort(["a"], false) + .sort(["a"], false, false) .unwrap(); assert_eq!( @@ -815,7 +815,7 @@ fn test_lazy_groupby_sort_by() { .agg([col("b").sort_by([col("c")], [true]).first()]) .collect() .unwrap() - .sort(["a"], false) + .sort(["a"], false, false) .unwrap(); assert_eq!( @@ -900,6 +900,7 @@ fn test_lazy_groupby_filter() -> PolarsResult<()> { descending: false, nulls_last: false, multithreaded: true, + maintain_order: false, }, ) .collect()?; @@ -1627,6 +1628,7 @@ fn test_single_group_result() -> PolarsResult<()> { descending: false, nulls_last: false, multithreaded: true, + maintain_order: false, }) .over([col("a")])]) .collect()?; @@ -1888,3 +1890,23 @@ fn test_partitioned_gb_ternary() -> PolarsResult<()> { Ok(()) } + +#[test] +fn test_sort_maintain_order_true() -> PolarsResult<()> { + let q = df![ + "A" => [1, 1, 1, 1], + "B" => ["A", "B", "C", "D"], + ]? + .lazy(); + + let res = q + .sort_by_exprs([col("A")], [false], false, true) + .slice(0, 3) + .collect()?; + println!("{:?}", res); + assert!(res.frame_equal(&df![ + "A" => [1, 1, 1], + "B" => ["A", "B", "C"], + ]?)); + Ok(()) +} diff --git a/polars/polars-lazy/src/tests/streaming.rs b/polars/polars-lazy/src/tests/streaming.rs index 2eed76189868..7139b8a49efa 100644 --- a/polars/polars-lazy/src/tests/streaming.rs +++ b/polars/polars-lazy/src/tests/streaming.rs @@ -100,7 +100,12 @@ fn test_streaming_multiple_keys_aggregate() -> PolarsResult<()> { (col("fats_g") * lit(10)).sum(), col("calories").mean().alias("cal_mean"), ]) - .sort_by_exprs([col("sugars_g"), col("calories")], [false, false], false); + .sort_by_exprs( + [col("sugars_g"), col("calories")], + [false, false], + false, + false, + ); assert_streaming_with_default(q, true, false); Ok(()) @@ -130,7 +135,7 @@ fn test_streaming_unique() -> PolarsResult<()> { let q = q .select([col("sugars_g"), col("calories")]) .unique(None, Default::default()) - .sort_by_exprs([cols(["sugars_g", "calories"])], [false], false); + .sort_by_exprs([cols(["sugars_g", "calories"])], [false], false, false); assert_streaming_with_default(q, true, false); Ok(()) @@ -364,3 +369,23 @@ fn test_streaming_double_left_join() -> PolarsResult<()> { assert_streaming_with_default(q, true, false); Ok(()) } + +#[test] +fn test_sort_maintain_order_streaming() -> PolarsResult<()> { + let q = df![ + "A" => [1, 1, 1, 1], + "B" => ["A", "B", "C", "D"], + ]? + .lazy(); + + let res = q + .sort_by_exprs([col("A")], [false], false, true) + .slice(0, 3) + .with_streaming(true) + .collect()?; + assert!(res.frame_equal(&df![ + "A" => [1, 1, 1], + "B" => ["A", "B", "C"], + ]?)); + Ok(()) +} diff --git a/polars/polars-lazy/src/tests/tpch.rs b/polars/polars-lazy/src/tests/tpch.rs index eeeee544ba92..6606569a930d 100644 --- a/polars/polars-lazy/src/tests/tpch.rs +++ b/polars/polars-lazy/src/tests/tpch.rs @@ -80,6 +80,7 @@ fn test_q2() -> PolarsResult<()> { [cols(["s_acctbal", "n_name", "s_name", "p_partkey"])], [true, false, false, false], false, + false, ) .limit(100) .with_common_subplan_elimination(true); diff --git a/polars/polars-ops/src/series/ops/various.rs b/polars/polars-ops/src/series/ops/various.rs index 695dd7f25fd6..5bf22ad0f069 100644 --- a/polars/polars-ops/src/series/ops/various.rs +++ b/polars/polars-ops/src/series/ops/various.rs @@ -17,7 +17,7 @@ pub trait SeriesMethods: SeriesSealed { let cols = vec![values, counts.into_series()]; let df = DataFrame::new_no_checks(cols); if sorted { - df.sort(["counts"], true) + df.sort(["counts"], true, false) } else { Ok(df) } diff --git a/polars/polars-sql/src/context.rs b/polars/polars-sql/src/context.rs index 3ce982559c1e..6b649efdc207 100644 --- a/polars/polars-sql/src/context.rs +++ b/polars/polars-sql/src/context.rs @@ -477,7 +477,7 @@ impl SQLContext { ); } - Ok(lf.sort_by_exprs(&by, descending, false)) + Ok(lf.sort_by_exprs(&by, descending, false, false)) } fn process_groupby( diff --git a/polars/polars-sql/tests/iss_7437.rs b/polars/polars-sql/tests/iss_7437.rs index 7a4e3f25a75e..a8479d92691e 100644 --- a/polars/polars-sql/tests/iss_7437.rs +++ b/polars/polars-sql/tests/iss_7437.rs @@ -21,14 +21,14 @@ fn iss_7437() -> PolarsResult<()> { "#, )? .collect()? - .sort(["category"], vec![false])?; + .sort(["category"], vec![false], false)?; let expected = LazyCsvReader::new("../../examples/datasets/foods1.csv") .finish()? .groupby(vec![col("category").alias("category")]) .agg(vec![]) .collect()? - .sort(["category"], vec![false])?; + .sort(["category"], vec![false], false)?; assert!(df_sql.frame_equal(&expected)); Ok(()) diff --git a/polars/polars-sql/tests/ops_distinct_on.rs b/polars/polars-sql/tests/ops_distinct_on.rs index 64c94da32108..9497cf63c530 100644 --- a/polars/polars-sql/tests/ops_distinct_on.rs +++ b/polars/polars-sql/tests/ops_distinct_on.rs @@ -32,6 +32,7 @@ fn test_distinct_on() { vec![col("Name"), col("Record Date")], vec![false, true], true, + false, ) .groupby_stable(vec![col("Name")]) .agg(vec![col("*").first()]); diff --git a/polars/polars-sql/tests/simple_exprs.rs b/polars/polars-sql/tests/simple_exprs.rs index 70b173e2e6de..4e5dab9869c4 100644 --- a/polars/polars-sql/tests/simple_exprs.rs +++ b/polars/polars-sql/tests/simple_exprs.rs @@ -496,6 +496,7 @@ fn test_groupby_2() -> PolarsResult<()> { vec![col("count"), col("category")], vec![false, true], false, + false, ) .limit(2); let expected = expected.collect()?; diff --git a/polars/src/docs/lazy.rs b/polars/src/docs/lazy.rs index 2cc5aa8064cc..0e72c5a540c4 100644 --- a/polars/src/docs/lazy.rs +++ b/polars/src/docs/lazy.rs @@ -85,7 +85,7 @@ //! let descending = vec![true, false]; //! //! let sorted = df.lazy() -//! .sort_by_exprs(vec![col("b"), col("a")], descending, false) +//! .sort_by_exprs(vec![col("b"), col("a")], descending, false, false) //! .collect()?; //! //! // sorted: diff --git a/polars/tests/it/lazy/expressions/window.rs b/polars/tests/it/lazy/expressions/window.rs index 9985e3f9ad67..30bc1bde1e6e 100644 --- a/polars/tests/it/lazy/expressions/window.rs +++ b/polars/tests/it/lazy/expressions/window.rs @@ -268,7 +268,7 @@ fn test_window_mapping() -> PolarsResult<()> { // now sorted // this will trigger a fast path - let df = df.sort(["fruits"], vec![false])?; + let df = df.sort(["fruits"], vec![false], false)?; let out = df .clone() diff --git a/py-polars/polars/dataframe/frame.py b/py-polars/polars/dataframe/frame.py index 04758c45f590..ebd6445761b5 100644 --- a/py-polars/polars/dataframe/frame.py +++ b/py-polars/polars/dataframe/frame.py @@ -4022,6 +4022,7 @@ def top_k( by: IntoExpr | Iterable[IntoExpr], descending: bool | Sequence[bool] = False, nulls_last: bool = False, + maintain_order: bool = False, ) -> DataFrame: """ Return the `k` largest elements. @@ -4040,6 +4041,10 @@ def top_k( per column by passing a sequence of booleans. nulls_last Place null values last. + maintain_order + Whether the order should be maintained if elements are equal. + Note that if `true` streaming is not possible and performance might be + worse since this requires a stable search. See Also -------- @@ -4087,7 +4092,13 @@ def top_k( """ return ( self.lazy() - .top_k(k, by=by, descending=descending, nulls_last=nulls_last) + .top_k( + k, + by=by, + descending=descending, + nulls_last=nulls_last, + maintain_order=maintain_order, + ) .collect( projection_pushdown=False, predicate_pushdown=False, @@ -4103,6 +4114,7 @@ def bottom_k( by: IntoExpr | Iterable[IntoExpr], descending: bool | Sequence[bool] = False, nulls_last: bool = False, + maintain_order: bool = False, ) -> DataFrame: """ Return the `k` smallest elements. @@ -4121,6 +4133,10 @@ def bottom_k( per column by passing a sequence of booleans. nulls_last Place null values last. + maintain_order + Whether the order should be maintained if elements are equal. + Note that if `true` streaming is not possible and performance might be + worse since this requires a stable search. See Also -------- @@ -4168,7 +4184,13 @@ def bottom_k( """ return ( self.lazy() - .bottom_k(k, by=by, descending=descending, nulls_last=nulls_last) + .bottom_k( + k, + by=by, + descending=descending, + nulls_last=nulls_last, + maintain_order=maintain_order, + ) .collect( projection_pushdown=False, predicate_pushdown=False, diff --git a/py-polars/polars/lazyframe/frame.py b/py-polars/polars/lazyframe/frame.py index 4a65fb814528..66d91f95fa87 100644 --- a/py-polars/polars/lazyframe/frame.py +++ b/py-polars/polars/lazyframe/frame.py @@ -1030,6 +1030,7 @@ def sort( *more_by: IntoExpr, descending: bool | Sequence[bool] = False, nulls_last: bool = False, + maintain_order: bool = False, ) -> Self: """ Sort the dataframe by the given columns. @@ -1046,6 +1047,10 @@ def sort( per column by passing a sequence of booleans. nulls_last Place null values last. + maintain_order + Whether the order should be maintained if elements are equal. + Note that if `true` streaming is not possible and performance might be + worse since this requires a stable search. Examples -------- @@ -1115,7 +1120,9 @@ def sort( """ # Fast path for sorting by a single existing column if isinstance(by, str) and not more_by: - return self._from_pyldf(self._ldf.sort(by, descending, nulls_last)) + return self._from_pyldf( + self._ldf.sort(by, descending, nulls_last, maintain_order) + ) by = parse_as_list_of_expressions(by, *more_by) @@ -1125,7 +1132,9 @@ def sort( raise ValueError( f"the length of `descending` ({len(descending)}) does not match the length of `by` ({len(by)})" ) - return self._from_pyldf(self._ldf.sort_by_exprs(by, descending, nulls_last)) + return self._from_pyldf( + self._ldf.sort_by_exprs(by, descending, nulls_last, maintain_order) + ) def top_k( self, @@ -1134,6 +1143,7 @@ def top_k( by: IntoExpr | Iterable[IntoExpr], descending: bool | Sequence[bool] = False, nulls_last: bool = False, + maintain_order: bool = False, ) -> Self: """ Return the `k` largest elements. @@ -1152,6 +1162,10 @@ def top_k( per column by passing a sequence of booleans. nulls_last Place null values last. + maintain_order + Whether the order should be maintained if elements are equal. + Note that if `true` streaming is not possible and performance might + be worse since this requires a stable search. See Also -------- @@ -1204,7 +1218,9 @@ def top_k( raise ValueError( f"the length of `descending` ({len(descending)}) does not match the length of `by` ({len(by)})" ) - return self._from_pyldf(self._ldf.top_k(k, by, descending, nulls_last)) + return self._from_pyldf( + self._ldf.top_k(k, by, descending, nulls_last, maintain_order) + ) def bottom_k( self, @@ -1213,6 +1229,7 @@ def bottom_k( by: IntoExpr | Iterable[IntoExpr], descending: bool | Sequence[bool] = False, nulls_last: bool = False, + maintain_order: bool = False, ) -> Self: """ Return the `k` smallest elements. @@ -1231,6 +1248,10 @@ def bottom_k( per column by passing a sequence of booleans. nulls_last Place null values last. + maintain_order + Whether the order should be maintained if elements are equal. + Note that if `true` streaming is not possible and performance might be + worse since this requires a stable search. See Also -------- @@ -1279,7 +1300,9 @@ def bottom_k( by = parse_as_list_of_expressions(by) if isinstance(descending, bool): descending = [descending] - return self._from_pyldf(self._ldf.bottom_k(k, by, descending, nulls_last)) + return self._from_pyldf( + self._ldf.bottom_k(k, by, descending, nulls_last, maintain_order) + ) def profile( self, diff --git a/py-polars/src/expr/general.rs b/py-polars/src/expr/general.rs index 53242cd2abde..a91107694623 100644 --- a/py-polars/src/expr/general.rs +++ b/py-polars/src/expr/general.rs @@ -242,6 +242,7 @@ impl PyExpr { descending, nulls_last, multithreaded: true, + maintain_order: false, }) .into() } @@ -253,6 +254,7 @@ impl PyExpr { descending, nulls_last, multithreaded: true, + maintain_order: false, }) .into() } diff --git a/py-polars/src/lazyframe.rs b/py-polars/src/lazyframe.rs index 98cee161b9f2..a44905d50099 100644 --- a/py-polars/src/lazyframe.rs +++ b/py-polars/src/lazyframe.rs @@ -354,7 +354,13 @@ impl PyLazyFrame { ldf.into() } - fn sort(&self, by_column: &str, descending: bool, nulls_last: bool) -> Self { + fn sort( + &self, + by_column: &str, + descending: bool, + nulls_last: bool, + maintain_order: bool, + ) -> Self { let ldf = self.ldf.clone(); ldf.sort( by_column, @@ -362,21 +368,37 @@ impl PyLazyFrame { descending, nulls_last, multithreaded: true, + maintain_order, }, ) .into() } - fn sort_by_exprs(&self, by: Vec, descending: Vec, nulls_last: bool) -> Self { + fn sort_by_exprs( + &self, + by: Vec, + descending: Vec, + nulls_last: bool, + maintain_order: bool, + ) -> Self { let ldf = self.ldf.clone(); let exprs = by.to_exprs(); - ldf.sort_by_exprs(exprs, descending, nulls_last).into() + ldf.sort_by_exprs(exprs, descending, nulls_last, maintain_order) + .into() } - fn top_k(&self, k: IdxSize, by: Vec, descending: Vec, nulls_last: bool) -> Self { + fn top_k( + &self, + k: IdxSize, + by: Vec, + descending: Vec, + nulls_last: bool, + maintain_order: bool, + ) -> Self { let ldf = self.ldf.clone(); let exprs = by.to_exprs(); - ldf.top_k(k, exprs, descending, nulls_last).into() + ldf.top_k(k, exprs, descending, nulls_last, maintain_order) + .into() } fn bottom_k( @@ -385,10 +407,12 @@ impl PyLazyFrame { by: Vec, descending: Vec, nulls_last: bool, + maintain_order: bool, ) -> Self { let ldf = self.ldf.clone(); let exprs = by.to_exprs(); - ldf.bottom_k(k, exprs, descending, nulls_last).into() + ldf.bottom_k(k, exprs, descending, nulls_last, maintain_order) + .into() } fn cache(&self) -> Self { diff --git a/py-polars/src/series/mod.rs b/py-polars/src/series/mod.rs index 9dd68daaa83c..6c208541bd23 100644 --- a/py-polars/src/series/mod.rs +++ b/py-polars/src/series/mod.rs @@ -656,6 +656,7 @@ impl PySeries { descending, nulls_last: descending, multithreaded: true, + maintain_order: false, }; Ok(self.series.is_sorted(options).map_err(PyPolarsErr::from)?) } diff --git a/py-polars/tests/unit/test_df.py b/py-polars/tests/unit/test_df.py index dc74643c05be..45ee80187f1c 100644 --- a/py-polars/tests/unit/test_df.py +++ b/py-polars/tests/unit/test_df.py @@ -483,6 +483,24 @@ def test_sort() -> None: ) +def test_sort_maintain_order() -> None: + l1 = ( + pl.LazyFrame({"A": [1] * 4, "B": ["A", "B", "C", "D"]}) + .sort("A", maintain_order=True) + .slice(0, 3) + .collect()["B"] + .to_list() + ) + l2 = ( + pl.LazyFrame({"A": [1] * 4, "B": ["A", "B", "C", "D"]}) + .sort("A") + .collect() + .slice(0, 3)["B"] + .to_list() + ) + assert l1 == l2 == ["A", "B", "C"] + + def test_replace() -> None: df = pl.DataFrame({"a": [2, 1, 3], "b": [1, 2, 3]}) s = pl.Series("c", [True, False, True])