From e70d0431bad38187e022836eebf5a77f1ea0c2a2 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 11 Oct 2023 10:21:28 -0400 Subject: [PATCH 1/4] Minor: Assert streaming_merge has non empty sort exprs --- datafusion/physical-plan/src/sorts/cursor.rs | 16 ++++++++++++---- datafusion/physical-plan/src/sorts/merge.rs | 8 +++++++- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/cursor.rs b/datafusion/physical-plan/src/sorts/cursor.rs index baa417649fb0..660d22aec429 100644 --- a/datafusion/physical-plan/src/sorts/cursor.rs +++ b/datafusion/physical-plan/src/sorts/cursor.rs @@ -48,16 +48,17 @@ impl std::fmt::Debug for RowCursor { impl RowCursor { /// Create a new SortKeyCursor from `rows` and a `reservation` - /// that tracks its memory. + /// that tracks its memory. Thre must be at least one row /// /// Panic's if the reservation is not for exactly `rows.size()` - /// bytes + /// bytes or if `rows` is empty. pub fn new(rows: Rows, reservation: MemoryReservation) -> Self { assert_eq!( rows.size(), reservation.size(), "memory reservation mismatch" ); + assert!(rows.num_rows() > 0); Self { cur_row: 0, num_rows: rows.num_rows(), @@ -92,7 +93,10 @@ impl Ord for RowCursor { } } -/// A cursor into a sorted batch of rows +/// A cursor into a sorted batch of rows. +/// +/// Each cursor must have at least one row so `advance` can be called at least +/// once prior to calling `is_finished`. pub trait Cursor: Ord { /// Returns true if there are no more rows in this cursor fn is_finished(&self) -> bool; @@ -207,8 +211,12 @@ pub struct FieldCursor { } impl FieldCursor { - /// Create a new [`FieldCursor`] from the provided `values` sorted according to `options` + /// Create a new [`FieldCursor`] from the provided `values` sorted according + /// to `options`. + /// + /// Panics if the array is empty pub fn new>(options: SortOptions, array: &A) -> Self { + assert!(array.len() > 0, "Empty array passed to FieldValues"); let null_threshold = match options.nulls_first { true => array.null_count(), false => array.len() - array.null_count(), diff --git a/datafusion/physical-plan/src/sorts/merge.rs b/datafusion/physical-plan/src/sorts/merge.rs index 67685509abe5..fe1749286de9 100644 --- a/datafusion/physical-plan/src/sorts/merge.rs +++ b/datafusion/physical-plan/src/sorts/merge.rs @@ -26,7 +26,7 @@ use crate::{PhysicalSortExpr, RecordBatchStream, SendableRecordBatchStream}; use arrow::datatypes::{DataType, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow_array::*; -use datafusion_common::Result; +use datafusion_common::{internal_err, Result}; use datafusion_execution::memory_pool::MemoryReservation; use futures::Stream; use std::pin::Pin; @@ -63,6 +63,12 @@ pub fn streaming_merge( fetch: Option, reservation: MemoryReservation, ) -> Result { + // If there are no sort expressions, preserving the order + // doesn't mean anything (and result in inifinite loops) + if expressions.is_empty() { + return internal_err!("Sort expressions cannot be empty for streaming merge"); + } + // Special case single column comparisons with optimized cursor implementations if expressions.len() == 1 { let sort = expressions[0].clone(); From 93da6ef5e5a605d965c9855a1bb60079c3b47e45 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 11 Oct 2023 10:27:00 -0400 Subject: [PATCH 2/4] clippy --- datafusion/physical-plan/src/sorts/merge.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/sorts/merge.rs b/datafusion/physical-plan/src/sorts/merge.rs index fe1749286de9..71fce7a60c07 100644 --- a/datafusion/physical-plan/src/sorts/merge.rs +++ b/datafusion/physical-plan/src/sorts/merge.rs @@ -26,7 +26,7 @@ use crate::{PhysicalSortExpr, RecordBatchStream, SendableRecordBatchStream}; use arrow::datatypes::{DataType, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow_array::*; -use datafusion_common::{internal_err, Result}; +use datafusion_common::{internal_err, DataFusionError, Result}; use datafusion_execution::memory_pool::MemoryReservation; use futures::Stream; use std::pin::Pin; From f608e407fbee79a7d1dc719b5d520c82b5d5f4f3 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 11 Oct 2023 11:39:31 -0400 Subject: [PATCH 3/4] Add test --- .../src/sorts/sort_preserving_merge.rs | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 5b485e0b68e4..8e84019f3ef6 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -287,7 +287,7 @@ mod tests { use crate::test::{self, assert_is_pending, make_partition}; use crate::{collect, common}; use arrow::array::{Int32Array, StringArray, TimestampNanosecondArray}; - use datafusion_common::assert_batches_eq; + use datafusion_common::{assert_batches_eq, assert_contains}; use super::*; @@ -339,6 +339,25 @@ mod tests { .await; } + #[tokio::test] + async fn test_merge_no_exprs() { + let task_ctx = Arc::new(TaskContext::default()); + let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3])); + let batch = RecordBatch::try_from_iter(vec![("a", a)]).unwrap(); + + let schema = batch.schema(); + let sort = vec![]; // no sort expressions + let exec = MemoryExec::try_new(&[vec![batch.clone()], vec![batch]], schema, None) + .unwrap(); + let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec))); + + let res = collect(merge, task_ctx).await.unwrap_err(); + assert_contains!( + res.to_string(), + "Internal error: Sort expressions cannot be empty for streaming merge" + ); + } + #[tokio::test] async fn test_merge_some_overlap() { let task_ctx = Arc::new(TaskContext::default()); From 9bd68a504336654f06f33e69d1f572f05bd5b08f Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 16 Oct 2023 10:26:44 -0400 Subject: [PATCH 4/4] Apply suggestions from code review Co-authored-by: jakevin Co-authored-by: Liang-Chi Hsieh --- datafusion/physical-plan/src/sorts/cursor.rs | 6 +++--- datafusion/physical-plan/src/sorts/streaming_merge.rs | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/cursor.rs b/datafusion/physical-plan/src/sorts/cursor.rs index 660d22aec429..52de880bae69 100644 --- a/datafusion/physical-plan/src/sorts/cursor.rs +++ b/datafusion/physical-plan/src/sorts/cursor.rs @@ -48,9 +48,9 @@ impl std::fmt::Debug for RowCursor { impl RowCursor { /// Create a new SortKeyCursor from `rows` and a `reservation` - /// that tracks its memory. Thre must be at least one row + /// that tracks its memory. There must be at least one row /// - /// Panic's if the reservation is not for exactly `rows.size()` + /// Panics if the reservation is not for exactly `rows.size()` /// bytes or if `rows` is empty. pub fn new(rows: Rows, reservation: MemoryReservation) -> Self { assert_eq!( @@ -216,7 +216,7 @@ impl FieldCursor { /// /// Panics if the array is empty pub fn new>(options: SortOptions, array: &A) -> Self { - assert!(array.len() > 0, "Empty array passed to FieldValues"); + assert!(array.len() > 0, "Empty array passed to FieldCursor"); let null_threshold = match options.nulls_first { true => array.null_count(), false => array.len() - array.null_count(), diff --git a/datafusion/physical-plan/src/sorts/streaming_merge.rs b/datafusion/physical-plan/src/sorts/streaming_merge.rs index de4f287377a2..4f8d8063853b 100644 --- a/datafusion/physical-plan/src/sorts/streaming_merge.rs +++ b/datafusion/physical-plan/src/sorts/streaming_merge.rs @@ -61,7 +61,7 @@ pub fn streaming_merge( reservation: MemoryReservation, ) -> Result { // If there are no sort expressions, preserving the order - // doesn't mean anything (and result in inifinite loops) + // doesn't mean anything (and result in infinite loops) if expressions.is_empty() { return internal_err!("Sort expressions cannot be empty for streaming merge"); }