diff --git a/datafusion/physical-expr/src/window/nth_value.rs b/datafusion/physical-expr/src/window/nth_value.rs index 5c7c891f92d2..e913f39333f9 100644 --- a/datafusion/physical-expr/src/window/nth_value.rs +++ b/datafusion/physical-expr/src/window/nth_value.rs @@ -83,19 +83,16 @@ impl NthValue { name: impl Into, expr: Arc, data_type: DataType, - n: u32, + n: i64, ignore_nulls: bool, ) -> Result { - if ignore_nulls { - return exec_err!("NTH_VALUE ignore_nulls is not supported yet"); - } match n { 0 => exec_err!("NTH_VALUE expects n to be non-zero"), _ => Ok(Self { name: name.into(), expr, data_type, - kind: NthValueKind::Nth(n as i64), + kind: NthValueKind::Nth(n), ignore_nulls, }), } @@ -267,20 +264,37 @@ impl PartitionEvaluator for NthValueEvaluator { if index >= n_range { // Outside the range, return NULL: ScalarValue::try_from(arr.data_type()) + } else if self.ignore_nulls { + let valid_indices = valid_indices.unwrap(); + if index >= valid_indices.len() { + return ScalarValue::try_from(arr.data_type()); + } + ScalarValue::try_from_array( + &slice.unwrap(), + valid_indices[index], + ) } else { ScalarValue::try_from_array(arr, range.start + index) } } Ordering::Less => { let reverse_index = (-n) as usize; - if n_range >= reverse_index { + if n_range < reverse_index { + // Outside the range, return NULL: + ScalarValue::try_from(arr.data_type()) + } else if self.ignore_nulls { + let valid_indices = valid_indices.unwrap(); + if reverse_index > valid_indices.len() { + return ScalarValue::try_from(arr.data_type()); + } + let new_index = + valid_indices[valid_indices.len() - reverse_index]; + ScalarValue::try_from_array(&slice.unwrap(), new_index) + } else { ScalarValue::try_from_array( arr, range.start + n_range - reverse_index, ) - } else { - // Outside the range, return NULL: - ScalarValue::try_from(arr.data_type()) } } Ordering::Equal => { diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index 6712bc855ffd..da2b24487d02 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -249,7 +249,6 @@ fn create_built_in_window_expr( .clone() .try_into() .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?; - let n: u32 = n as u32; Arc::new(NthValue::nth( name, arg, diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index efb180b10cd8..a309a97137d2 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -4617,3 +4617,185 @@ NULL 1 statement ok DROP TABLE t; + +# Test for ignore nulls with ORDER BY in nth_VALUE +statement ok +CREATE TABLE t AS VALUES (3, 3), (4, 4), (null::bigint, 1), (null::bigint, 2), (5, 5), (6, 6); + +query I +SELECT column1 FROM t ORDER BY column2; +---- +NULL +NULL +3 +4 +5 +6 + +query I +SELECT nth_VALUE(column1, 3) OVER(ORDER BY column2) FROM t; +---- +NULL +NULL +3 +3 +3 +3 + +query I +SELECT nth_VALUE(column1, 3) IGNORE NULLS OVER(ORDER BY column2) FROM t; +---- +NULL +NULL +NULL +NULL +5 +5 + +query I +SELECT nth_VALUE(column1, 3) OVER(ORDER BY column2 ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) FROM t; +---- +3 +3 +3 +3 +3 +3 + +query I +SELECT nth_VALUE(column1, 3) IGNORE NULLS OVER(ORDER BY column2 ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) FROM t; +---- +5 +5 +5 +5 +5 +5 + +query I +SELECT nth_value(column1, 3) OVER(ORDER BY column2 RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING) FROM t; +---- +NULL +3 +4 +5 +6 +NULL + +query I +SELECT nth_value(column1, 3) IGNORE NULLS OVER (ORDER BY column2 RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING) FROM t; +---- +NULL +NULL +NULL +5 +6 +NULL + +statement ok +DROP TABLE t; + +# Test for negative index in NTH_VALUE +statement ok +CREATE TABLE t AS VALUES (3, 3), (4, 4), (null::bigint, 1), (null::bigint, 2), (5, 5), (6, 6); + +query I +SELECT column1 FROM t ORDER BY column2 DESC NULLS LAST; +---- +6 +5 +4 +3 +NULL +NULL + +query I +SELECT nth_VALUE(column1, -2) OVER(ORDER BY column2 DESC NULLS LAST) FROM t; +---- +NULL +6 +5 +4 +3 +NULL + +query I +SELECT nth_VALUE(column1, -2) IGNORE NULLS OVER(ORDER BY column2 DESC NULLS LAST) FROM t; +---- +NULL +6 +5 +4 +4 +4 + +query I +SELECT nth_VALUE(column1, -2) OVER(ORDER BY column2 ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) FROM t; +---- +5 +5 +5 +5 +5 +5 + +query I +SELECT nth_VALUE(column1, -2) IGNORE NULLS OVER(ORDER BY column2 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) FROM t; +---- +4 +4 +4 +4 +4 +4 + +query I +SELECT nth_value(column1, -2) OVER (ORDER BY column2 DESC NULLS LAST RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING) FROM t; +---- +6 +5 +4 +3 +NULL +NULL + +query I +SELECT nth_value(column1, -2) IGNORE NULLS OVER (ORDER BY column2 DESC NULLS LAST RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING) FROM t; +---- +6 +5 +4 +4 +NULL +NULL + +statement ok +DROP TABLE t; + +# Test for ignore nulls with ORDER BY in NTH_VALUE with all NULLs +statement ok +CREATE TABLE t AS VALUES (null::bigint, 4), (null::bigint, 3), (null::bigint, 1), (null::bigint, 2); + +query II +SELECT NTH_VALUE(column1, 2) OVER(ORDER BY column2 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), column2 FROM t; +---- +NULL 4 +NULL 3 +NULL 2 +NULL 1 + +query II +SELECT NTH_VALUE(column1, 2) RESPECT NULLS OVER(ORDER BY column2 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), column2 FROM t; +---- +NULL 4 +NULL 3 +NULL 2 +NULL 1 + +query II +SELECT NTH_VALUE(column1, 2) IGNORE NULLS OVER(ORDER BY column2 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), column2 FROM t; +---- +NULL 4 +NULL 3 +NULL 2 +NULL 1